You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:27 UTC
[85/90] [abbrv] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
new file mode 100644
index 0000000..9ddadb3
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
@@ -0,0 +1,301 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.subscription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.util.OMElementComparator;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionState {
+
+ private final Logger logger = LoggerFactory.getLogger(SubscriptionState.class);
+
+ private long creationTime = 0;
+ private long lastAvailableTime = 0;
+
+ private int unAvailableCounter = 0;
+ private boolean isNeverExpire = false;
+ private boolean isWsrmPolicy;
+
+ public String subId;
+ public String curNotif;
+ private String localTopicString;
+ private String xpathString;
+ private String subscribeXml;
+
+ ConsumerInfo consumerInfo = null;
+
+ EndpointReference consumerReference;
+
+ URI consumerURI = null;
+
+ private OutGoingQueue outGoingQueue;
+
+ /**
+ * @return Returns the creationTime.
+ */
+ public long getCreationTime() {
+ return creationTime;
+ }
+
+ /**
+ * @param creationTime
+ * The creationTime to set.
+ */
+ public void setCreationTime(long creationTime) {
+ this.creationTime = creationTime;
+ }
+
+ public void setId(String id) {
+ subId = id;
+ }
+
+ public String getId() {
+ return subId;
+ }
+
+ public boolean isWsrmPolicy() {
+ return isWsrmPolicy;
+ }
+
+ public void setWsrmPolicy(boolean wsrmPolicy) {
+ this.isWsrmPolicy = wsrmPolicy;
+ }
+
+ // TODO: outGoingQueue is not belong to this class. Move it to elsewhere
+ // related to notification handler in wsntAdapter.
+ public SubscriptionState(EndpointReference consumerRef, boolean useNotify, boolean wsrmEnabled, String topic,
+ String xpath, String type, OutGoingQueue outGoingQueue) {
+ this.consumerReference = consumerRef;
+ try {
+ this.consumerURI = new URI(consumerRef.getAddress());
+ } catch (URISyntaxException e) {
+ // this should not happen
+ logger.error("invalid consumer URI returned by axis om", e);
+
+ }
+ this.outGoingQueue = outGoingQueue;
+ // if (topic == null) {
+ // throw new IllegalArgumentException();
+ // }
+ this.localTopicString = topic;
+ this.xpathString = xpath;
+ this.isWsrmPolicy = wsrmEnabled;
+ consumerInfo = new ConsumerInfo(consumerRef.getAddress(), type, useNotify, false);
+
+ }
+
+ public void resume() {
+ consumerInfo.setPaused(false);
+ }
+
+ public void pause() {
+ consumerInfo.setPaused(true);
+ }
+
+ public String getConsumerIPAddressStr() {
+ return consumerURI.toString();
+ }
+
+ public URI getConsumerAddressURI() {
+ return consumerURI;
+ }
+
+ public String getLocalTopic() {
+ // QName topicExpressionQName =
+ // xsul.util.XsulUtil.toQName(localTopicString, localTopicString
+ // .requiredTextContent());
+ // String topicLocalString = topicExpressionQName.getLocalPart();
+ return localTopicString;
+ }
+
+ /**
+ * @return Returns the consumeReference.
+ */
+ public EndpointReference getConsumerReference() {
+ return consumerReference;
+ }
+
+ /**
+ * @return Returns the curNotif.
+ */
+ public String getCurNotif() {
+ return curNotif;
+ }
+
+ /**
+ * @param curNotif
+ * The curNotif to set.
+ */
+ public void setCurNotif(String curNotif) {
+ this.curNotif = curNotif;
+ }
+
+ /**
+ * @return Returns the outGoingQueue.
+ */
+ public OutGoingQueue getOutGoingQueue() {
+ return outGoingQueue;
+ }
+
+ /**
+ * @param outGoingQueue
+ * The outGoingQueue to set.
+ */
+ public void setOutGoingQueue(OutGoingQueue outGoingQueue) {
+ this.outGoingQueue = outGoingQueue;
+ }
+
+ /**
+ * @return Returns the consumerInfo.
+ */
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ public void resetUnAvailableCounter() {
+ unAvailableCounter = 0;
+ }
+
+ public int addUnAvailableCounter() {
+ unAvailableCounter++;
+ return unAvailableCounter;
+ }
+
+ /**
+ * @return Returns the unAvailableCounter.
+ */
+ public int getUnAvailableCounter() {
+ return unAvailableCounter;
+ }
+
+ public String getXpathString() {
+ return xpathString;
+ }
+
+ public void setXpathString(String xpathString) {
+ this.xpathString = xpathString;
+ }
+
+ /**
+ * @return Returns the isNeverExpire.
+ */
+ public boolean isNeverExpire() {
+ return isNeverExpire;
+ }
+
+ /**
+ * @param isNeverExpire
+ * The isNeverExpire to set.
+ */
+ public void setNeverExpire(boolean neverExpire) {
+ this.isNeverExpire = neverExpire;
+ }
+
+ /**
+ * @return Returns the lastAvailableTime.
+ */
+ public long getLastAvailableTime() {
+ return lastAvailableTime;
+ }
+
+ /**
+ * @param lastAvailableTime
+ * The lastAvailableTime to set.
+ */
+ public void setLastAvailableTime(long lastAvailableTime) {
+ this.lastAvailableTime = lastAvailableTime;
+
+ }
+
+ public void setSubscribeXml(String xml) {
+ subscribeXml = xml;
+ }
+
+ public String getSubscribeXml() {
+ return subscribeXml;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof SubscriptionState) {
+ SubscriptionState subscription = (SubscriptionState) o;
+ return BrokerUtil.sameStringValue(subscription.getLocalTopic(), this.getLocalTopic())
+ && BrokerUtil.sameStringValue(subscription.getXpathString(), this.getXpathString())
+ && BrokerUtil.sameStringValue(subscription.getConsumerIPAddressStr(),
+ this.getConsumerIPAddressStr()) && equalReferenceParameters(subscription);
+ }
+ return false;
+ }
+
+ private boolean equalReferenceParameters(SubscriptionState anotherSubscription) {
+
+ Map<QName, OMElement> otherRefProperties = anotherSubscription.getConsumerReference()
+ .getAllReferenceParameters();
+ Map<QName, OMElement> myRefProperties = getConsumerReference().getAllReferenceParameters();
+
+ /*
+ * Basic comparison
+ */
+ if (otherRefProperties == null && myRefProperties == null) {
+ return true;
+ } else if (otherRefProperties == null || myRefProperties == null) {
+ return false;
+ } else if (otherRefProperties.size() != myRefProperties.size()) {
+ return false;
+ }
+
+ /*
+ * This OMElementComparator supports ignore list, but we don't use it here.
+ */
+ Iterator<Entry<QName, OMElement>> iterator = otherRefProperties.entrySet().iterator();
+ while (iterator.hasNext()) {
+
+ Entry<QName, OMElement> entry = iterator.next();
+ if (!myRefProperties.containsKey(entry.getKey())) {
+ return false;
+ }
+
+ OMElement myElement = myRefProperties.get(entry.getKey());
+ OMElement otherElement = entry.getValue();
+
+ if (!OMElementComparator.compare(myElement, otherElement)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
new file mode 100644
index 0000000..fe276bf
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.airavata.wsmg.util.WsEventingOperations;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSEProcessingContextBuilder extends ProcessingContextBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(WSEProcessingContextBuilder.class);
+
+ public ProcessingContext build(OMElement elem) {
+
+ ProcessingContext processingContext = new ProcessingContext();
+
+ if (elem != null && elem.getLocalName().equals("Subscribe")) {
+ logger.debug("found subscribe element");
+ onSubscription(processingContext, elem);
+
+ }
+
+ return processingContext;
+ }
+
+ public ProcessingContext build(SOAPEnvelope elem) {
+
+ ProcessingContext context = null;
+
+ SOAPBody soapBody = elem.getBody();
+ if (soapBody != null) {
+
+ context = build(soapBody.getFirstElement());
+
+ } else {
+ context = build((OMElement) null);
+ }
+
+ context.setEnvelope(elem);
+ extractInfoFromHeader(context, elem.getHeader());
+
+ return context;
+ }
+
+ public ProcessingContext build(MessageContext msgContext, WsEventingOperations operation) {
+
+ ProcessingContext processingContext = new ProcessingContext();
+
+ switch (operation) {
+ case SUBSCRIBE: {
+
+ Iterator<OMElement> iterator = msgContext.getEnvelope().getBody()
+ .getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "Subscribe"));
+
+ if (!iterator.hasNext()) {
+ throw new RuntimeException("invalid subscription message - no subscribe element");
+ }
+
+ onSubscription(processingContext, iterator.next());
+ }
+ break;
+
+ }
+
+ processingContext.setMessageConext(msgContext);
+ processingContext.setEnvelope(msgContext.getEnvelope());
+ extractInfoFromHeader(processingContext, msgContext.getEnvelope().getHeader());
+ String topicFromUrl = BrokerUtil.getTopicFromRequestPath(msgContext.getTo().getAddress());
+
+ processingContext.setContextParameter(ContextParameters.TOPIC_FROM_URL, topicFromUrl);
+
+ return processingContext;
+ }
+
+ /**
+ * @param processingContext
+ * @param subscribeElement
+ */
+ private void onSubscription(ProcessingContext processingContext, OMElement subscribeElement) {
+
+ processingContext.setContextParameter(ContextParameters.SUBSCRIBE_ELEMENT, subscribeElement);
+
+ // -- check optional element - expires
+ Iterator iterator = subscribeElement.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
+ "Expires"));
+
+ if (iterator.hasNext()) {
+
+ processingContext.setContextParameter(ContextParameters.SUBSCRIBER_EXPIRES,
+ ((OMElement) iterator.next()).getText());
+
+ }
+
+ iterator = subscribeElement
+ .getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "Filter"));
+
+ if (!iterator.hasNext()) {
+
+ throw new RuntimeException("invalid subscription - unable to find filter dialet");
+
+ }
+
+ processingContext.setContextParameter(ContextParameters.FILTER_ELEMENT, iterator.next());
+
+ iterator = subscribeElement.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
+ "Delivery"));
+
+ if (!iterator.hasNext()) {
+ throw new RuntimeException("invalid subscription - unable to find delivery tag");
+ }
+
+ OMElement delivery = (OMElement) iterator.next();
+
+ iterator = delivery.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "NotifyTo"));
+
+ if (!iterator.hasNext()) {
+ throw new RuntimeException("invalid subscription - unable to find NotifyTo tag");
+ }
+
+ OMElement notifyToElement = (OMElement) iterator.next();
+
+ processingContext.setContextParameter(ContextParameters.NOTIFY_TO_ELEMENT, notifyToElement);
+
+ try {
+
+ processingContext.setContextParameter(ContextParameters.NOTIFY_TO_EPR,
+ EndpointReferenceHelper.fromOM(notifyToElement));
+
+ } catch (AxisFault e) {
+ throw new RuntimeException("invalid subscription - unable to parse notify to end point reference", e);
+ }
+
+ }
+
+ private void extractInfoFromHeader(ProcessingContext context, SOAPHeader header) {
+
+ Iterator ite = header.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
+ WsmgCommonConstants.SUBSCRIPTION_ID));
+
+ if (ite.hasNext()) {
+ OMElement identifier = (OMElement) ite.next();
+ logger.debug("extracted identifier " + identifier.getText());
+
+ context.setContextParameter(ContextParameters.SUB_ID, identifier.getText());
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
new file mode 100644
index 0000000..f17a980
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import java.util.Calendar;
+import java.util.Date;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSEProtocolSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(WSEProtocolSupport.class);
+
+ public SubscriptionState createSubscriptionState(ProcessingContext ctx, OutGoingQueue outgoingQueue)
+ throws AxisFault {
+
+ boolean neverExpire = false; // is true if expiration time is less than
+ String topicLocalString = "";
+ String xpathString = "";
+ EndpointReference consumerReference = ctx.getContextParameter(ContextParameters.NOTIFY_TO_EPR);
+
+ if (consumerReference == null) {
+ throw new AxisFault("Only Push delivery Mode (NotifyTo) is supported in WSE");
+ }
+
+ String expireTimeString = ctx.getContextParameter(ContextParameters.SUBSCRIBER_EXPIRES);
+
+ if (expireTimeString == null) {
+ neverExpire = true;
+
+ } else {
+
+ long expireTime = Long.valueOf(expireTimeString);
+ if (expireTime < 0) {
+ neverExpire = true;
+ }
+ }
+
+ OMElement filterEl = ctx.getContextParameter(ContextParameters.FILTER_ELEMENT);
+
+ if (filterEl == null) {
+
+ topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
+
+ if (topicLocalString == null) {
+ topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
+ }
+
+ log.debug("got topicLocalString=" + topicLocalString);
+ // topicLocalString = "wseTopic";
+ // // a special topic, used in WSNT and JMS. Do not use a
+ // wildcard topic here since wildcard string varies by system
+ } else {
+
+ String filterDialectAttrib = filterEl.getAttributeValue(new QName(null, "Dialect"));
+
+ if (filterDialectAttrib.compareTo(WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT) == 0) {
+ topicLocalString = BrokerUtil.getTopicLocalString(filterEl.getText()); // get what ever inside this
+ // element
+
+ if (topicLocalString == null) {
+ throw new AxisFault("topic is not given in the subscription");
+ }
+
+ } else if (filterDialectAttrib.compareTo(WsmgCommonConstants.XPATH_DIALECT) == 0) {
+
+ // use topicFromUrl if
+ // was provided
+ topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
+
+ xpathString = filterEl.getText();
+
+ log.debug("got topicLocalString=" + topicLocalString + " xpathString=" + xpathString);
+
+ // TODO: Add XPath canonicalization here in the parsing. To
+ // generate a
+ // canonicalized XPath string
+ // Possibly use Query query =
+ // XPQuery.parseQuery(xpathExpression, index);
+ if (xpathString == null) {
+ throw new AxisFault("xpath expression is not given");
+ }
+ } else if (filterDialectAttrib.compareTo(WsmgCommonConstants.TOPIC_AND_XPATH_DIALECT) == 0) {
+ OMElement topicEl = filterEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "TopicExpression"));
+ if (topicEl != null) {
+ topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
+ }
+ OMElement xpathEl = filterEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "MessageContent"));
+ if (xpathEl != null) {
+ xpathString = xpathEl.getText();
+ if (xpathString == null && topicLocalString == null) {
+ throw new AxisFault("Both topic string and " + "XPath String are null!");
+ }
+ }
+ } else {
+ throw new AxisFault("Unkown dialect: ");
+ // topicLocalString = "wseTopic"; //a special topic, used in
+ // WSNT and JMS
+ }
+
+ }
+
+ if (topicLocalString == null || topicLocalString.length() == 0) {
+ topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
+ }
+
+ // Create SubscriptionState Object
+ SubscriptionState state = new SubscriptionState(consumerReference, true, false, topicLocalString, xpathString,
+ "wse", outgoingQueue);
+
+ state.setNeverExpire(neverExpire); // default false
+
+ return state;
+ }
+
+ public void createSubscribeResponse(ProcessingContext ctx, String subId) throws AxisFault {
+
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+
+ ctx.addResponseMsgNameSpaces(NameSpaceConstants.WSE_NS);
+
+ OMElement responseMessage = factory.createOMElement("SubscribeResponse", NameSpaceConstants.WSE_NS);
+
+ OMElement identifier = factory.createOMElement(WsmgCommonConstants.SUBSCRIPTION_ID,
+ responseMessage.getNamespace());
+ identifier.setText(subId);
+ EndpointReference serviceLocationEndpointReference = new EndpointReference(ctx.getMessageContext()
+ .getAxisService().getEndpointURL());
+
+ serviceLocationEndpointReference.addReferenceParameter(identifier);
+
+ OMElement expiresEl = factory.createOMElement("Expires", responseMessage.getNamespace(), responseMessage);
+
+ Date expiration = getFutureExpirationDate();
+ String dateString = CommonRoutines.getXsdDateTime(expiration);
+ expiresEl.setText(dateString);
+
+ OMElement subscriptionManagerEpr = null;
+ try {
+
+ subscriptionManagerEpr = EndpointReferenceHelper.toOM(factory, serviceLocationEndpointReference, new QName(
+ NameSpaceConstants.WSE_NS.getNamespaceURI(), "SubscriptionManager"), NameSpaceConstants.WSA_NS
+ .getNamespaceURI());
+
+ responseMessage.addChild(subscriptionManagerEpr);
+ subscriptionManagerEpr.setNamespace(responseMessage.getNamespace());
+ } catch (AxisFault e) {
+ log.error("unable to resolve EPR from OM", e);
+ throw e;
+ }
+
+ ctx.setRespMessage(responseMessage);
+ }
+
+ private Date getFutureExpirationDate() {
+ // Get a Calendar for current locale and time zone
+ Calendar cal = Calendar.getInstance();
+ // currentDate.setDate(currentDate.getDate()+1);
+ // Get a Date object that represents 30 days from now
+ Date currentDate = new Date(); // Current date
+ cal.setTime(currentDate); // Set it in the Calendar object
+ cal.add(Calendar.DATE, 30); // Add 30 days
+ Date expiration = cal.getTime(); // Retrieve the resulting date
+ return expiration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
new file mode 100644
index 0000000..6af3bc5
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
+import org.apache.airavata.wsmg.util.WsEventingOperations;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BrokerServiceMessageReceiverInOut message receiver
+ */
+
+public class WSEventingMsgReceiver extends AbstractBrokerMsgReceiver {
+
+ private static final Logger log = LoggerFactory.getLogger(WSEventingMsgReceiver.class);
+ WSEProcessingContextBuilder builder = new WSEProcessingContextBuilder();
+
+ public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
+
+ WsEventingOperations msgType = WsEventingOperations.valueFrom(operationName);
+ ProcessingContext processingContext = builder.build(inMsg, msgType);
+ MessageContext outputMsg = null;
+
+ log.debug("WS-Eventing: " + msgType);
+
+ switch (msgType) {
+ case SUBSCRIBE: {
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+ .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case UNSUBSCRIBE: {
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+ .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case RENEW:
+ case GET_STATUS:
+ case SUBSCRIPTION_END:
+ default:
+ throw new AxisFault("unsupported operation" + msgType.toString());
+
+ }
+ return outputMsg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
new file mode 100644
index 0000000..63c6bbc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
+import org.apache.airavata.wsmg.util.WsEventingOperations;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+
+public class WSEventingPublishMsgReceiver extends AbstractBrokerMsgReceiver {
+
+ WSEProcessingContextBuilder builder = new WSEProcessingContextBuilder();
+
+ @Override
+ protected MessageContext process(MessageContext inMsgContext, String operationName) throws AxisFault {
+
+ ProcessingContext processingContext = builder.build(inMsgContext, WsEventingOperations.PUBLISH);
+
+ try {
+
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsgContext
+ .getConfigurationContext().getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getNotificationProcessor().processMsg(processingContext, NameSpaceConstants.WSE_NS);
+ } catch (Exception e) {
+ throw new AxisFault("unable to process message", e);
+ }
+ return createOutputMessageContext(inMsgContext, processingContext);
+ }
+
+ @Override
+ protected MessageContext createOutputMessageContext(MessageContext inMsg, ProcessingContext processingContext)
+ throws AxisFault {
+
+ MessageContext outputContext = null;
+
+ OMElement responseMessage = processingContext.getRespMessage();
+ if (responseMessage != null) {
+
+ outputContext = super.createOutputMessageContext(inMsg, processingContext);
+
+ String responseAction = String.format("%s/%s", NameSpaceConstants.WSE_NS.getNamespaceURI(),
+ responseMessage.getLocalName());
+
+ outputContext.setSoapAction(responseAction);
+ }
+
+ return outputContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
new file mode 100644
index 0000000..c8d3e37
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
@@ -0,0 +1,237 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wsnotification;
+
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSNTProtocolSupport {
+
+ private static final Logger log = LoggerFactory.getLogger(WSNTProtocolSupport.class);
+
+ public SubscriptionState createSubscriptionState(ProcessingContext ctx, OutGoingQueue outgoingQueue)
+ throws AxisFault {
+
+ EndpointReference consumerReference = ctx.getContextParameter(ContextParameters.NOTIFY_TO_EPR);
+
+ if (consumerReference == null) {
+ throw new AxisFault("Only Push delivery Mode (NotifyTo) is supported");
+ }
+
+ boolean neverExpire = false; // is true if expiration time is less than
+ boolean useNotify = true; // notify by event notifications
+ boolean wsrmEnabled = false;
+ String topicLocalString = "";
+ String xpathString = "";
+
+ String expireTimeString = ctx.getContextParameter(ContextParameters.SUBSCRIBER_EXPIRES);
+
+ if (expireTimeString == null) {
+ neverExpire = true;
+ } else {
+ long expireTime = Long.valueOf(expireTimeString);
+ if (expireTime < 0) {
+ neverExpire = true;
+ }
+ }
+
+ OMElement useNotifyEl = ctx.getContextParameter(ContextParameters.USE_NOTIFY_ELEMENT);
+ if (useNotifyEl != null) {
+ String s = useNotifyEl.getText();
+ useNotify = Boolean.valueOf(s);
+ }
+
+ // get policy if exist
+ OMElement element = ctx.getContextParameter(ContextParameters.SUB_POLICY);
+ if (element != null) {
+ wsrmEnabled = true;
+ }
+
+ OMElement topicExpressionEl = ctx.getContextParameter(ContextParameters.TOPIC_EXPRESSION_ELEMENT);
+
+ if (topicExpressionEl != null) {
+ topicLocalString = BrokerUtil.getTopicLocalString(topicExpressionEl.getText());
+ }
+
+ OMElement xpathEl = ctx.getContextParameter(ContextParameters.XPATH_ELEMENT);
+
+ if (xpathEl != null) {
+ xpathString = BrokerUtil.getXPathString(xpathEl);
+ }
+ if (xpathString == null && topicLocalString == null) {
+ throw new AxisFault("Both topic string and XPath String are null!");
+
+ }
+
+ if (topicLocalString == null || topicLocalString.length() == 0) {
+ topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
+ }
+
+ // Create SubscriptionState Object
+ SubscriptionState state = new SubscriptionState(consumerReference, useNotify, wsrmEnabled, topicLocalString,
+ xpathString, "wsnt", outgoingQueue);
+
+ state.setNeverExpire(neverExpire); // default false
+
+ return state;
+ }
+
+ public void createSubscribeResponse(ProcessingContext ctx, String subId) throws AxisFault {
+
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+
+ ctx.addResponseMsgNameSpaces(NameSpaceConstants.WSNT_NS);
+ OMElement responseMessage = factory.createOMElement("SubscribeResponse", NameSpaceConstants.WSNT_NS);
+
+ OMElement identifier = factory.createOMElement(WsmgCommonConstants.SUBSCRIPTION_ID,
+ responseMessage.getNamespace());
+ identifier.setText(subId);
+ EndpointReference serviceLocationEndpointReference = new EndpointReference(ctx.getMessageContext()
+ .getAxisService().getEndpointURL());
+ serviceLocationEndpointReference.addReferenceParameter(identifier);
+
+ OMElement subscriptionReference = null;
+ try {
+ subscriptionReference = EndpointReferenceHelper.toOM(factory, serviceLocationEndpointReference, new QName(
+ "SubscriptionReference"), NameSpaceConstants.WSA_NS.getNamespaceURI());
+
+ responseMessage.addChild(subscriptionReference);
+ subscriptionReference.setNamespace(responseMessage.getNamespace());
+
+ } catch (AxisFault e) {
+ log.error("unable to resolve EPR from OM", e);
+ throw e;
+ }
+
+ ctx.setRespMessage(responseMessage);
+ }
+
+ public static class Client {
+
+ public static OMElement createSubscriptionMsg(EndpointReference eventSinkLocation, String topicExpression,
+ String xpathExpression) throws AxisFault {
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+
+ OMElement message = factory.createOMElement("SubscribeRequest", NameSpaceConstants.WSNT_NS);
+
+ if (topicExpression != null) {
+ OMElement topicExpEl = factory.createOMElement("TopicExpression", NameSpaceConstants.WSNT_NS, message);
+
+ topicExpEl.addAttribute("Dialect", WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT,
+ NameSpaceConstants.WSNT_NS);
+ topicExpEl.declareNamespace(NameSpaceConstants.WIDGET_NS);
+ topicExpEl.setText(NameSpaceConstants.WIDGET_NS.getPrefix() + ":" + topicExpression);
+ }
+
+ if (xpathExpression != null) {
+ OMElement xpathExpEl = factory.createOMElement("Selector", NameSpaceConstants.WSNT_NS, message);
+ xpathExpEl.addAttribute("Dialect", WsmgCommonConstants.XPATH_DIALECT, null);
+ xpathExpEl.setText(xpathExpression);
+ }
+
+ OMElement useNotifyEl = factory.createOMElement("UseNotify", message.getNamespace(), message);
+ useNotifyEl.setText("true");// check wether we still need this
+
+ OMElement eprCrEl = EndpointReferenceHelper.toOM(factory, eventSinkLocation,
+ new QName("ConsumerReference"), NameSpaceConstants.WSA_NS.getNamespaceURI());
+
+ message.addChild(eprCrEl);
+ eprCrEl.setNamespace(message.getNamespace());
+
+ return message;
+ }
+
+ public static String decodeSubscriptionResponse(OMElement subscriptionReference) throws AxisFault {
+
+ String subscriptionId = null;
+
+ EndpointReference subscriptionReferenceEPR = EndpointReferenceHelper.fromOM(subscriptionReference);
+
+ Map<QName, OMElement> referenceParams = subscriptionReferenceEPR.getAllReferenceParameters();
+
+ if (referenceParams != null) {
+ QName identifierQName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(),
+ WsmgCommonConstants.SUBSCRIPTION_ID);
+
+ OMElement identifierEl = referenceParams.get(identifierQName);
+ subscriptionId = (identifierEl != null) ? identifierEl.getText() : null;
+
+ }
+
+ return subscriptionId;
+ }
+
+ public static OMElement createUnsubscribeMsg() {
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+ OMElement message = factory.createOMElement("UnsubsribeRequest", NameSpaceConstants.WSNT_NS);
+
+ return message;
+ }
+
+ public static OMElement encodeNotification(String topic, OMElement message, EndpointReference producerReference)
+ throws AxisFault {
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+
+ OMElement topicExpEl = factory.createOMElement("Topic", NameSpaceConstants.WSNT_NS);
+ topicExpEl.addAttribute("Dialect", WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT, null);
+ topicExpEl.declareNamespace(NameSpaceConstants.WIDGET_NS);
+ topicExpEl.setText(NameSpaceConstants.WIDGET_NS.getPrefix() + ":" + topic);
+
+ OMElement messageToNotify = factory.createOMElement("Notify", NameSpaceConstants.WSNT_NS);
+ messageToNotify.declareNamespace(NameSpaceConstants.WSNT_NS);
+ messageToNotify.declareNamespace(NameSpaceConstants.WSA_NS);
+ OMElement notificationMesssageEl = factory.createOMElement("NotificationMessage",
+ messageToNotify.getNamespace(), messageToNotify);
+
+ notificationMesssageEl.addChild(topicExpEl);
+
+ notificationMesssageEl.addChild(EndpointReferenceHelper.toOM(factory, producerReference, new QName(
+ notificationMesssageEl.getNamespace().getNamespaceURI(), "ProducerReference",
+ notificationMesssageEl.getNamespace().getPrefix()), NameSpaceConstants.WSA_NS.getNamespaceURI()));
+
+ OMElement messageEl = factory.createOMElement("Message", notificationMesssageEl.getNamespace(),
+ notificationMesssageEl);
+
+ messageEl.addChild(message);
+ return messageToNotify;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
new file mode 100644
index 0000000..9d32ee9
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wsnotification;
+
+import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
+import org.apache.airavata.wsmg.util.WsNotificationOperations;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BrokerServiceMessageReceiverInOut message receiver
+ */
+
+public class WSNotificationMsgReceiver extends AbstractBrokerMsgReceiver {
+
+ private static final Logger log = LoggerFactory.getLogger(WSNotificationMsgReceiver.class);
+
+ WSNotificationProcessingContextBuilder builder = new WSNotificationProcessingContextBuilder();
+
+ public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
+
+ WsNotificationOperations msgType = WsNotificationOperations.valueFrom(operationName);
+
+ ProcessingContext processingContext = builder.build(inMsg, msgType);
+
+ MessageContext outputMsg = null;
+
+ switch (msgType) {
+ case NOTIFY: {
+ try {
+
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg
+ .getConfigurationContext().getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getNotificationProcessor()
+ .processMsg(processingContext, NameSpaceConstants.WSNT_NS);
+ } catch (Exception e) {
+ throw new AxisFault("unable to process message", e);
+ }
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case SUBSCRIBE: {
+
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+ .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+ brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case UNSUBSCRIBE: {
+
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+ .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case RESUME_SUBSCRIPTION: {
+
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+ .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getSubscriptionManager().resumeSubscription(processingContext);
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case PAUSE_SUBSCRIPTION: {
+ WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+ .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+ brokerConfigContext.getSubscriptionManager().pauseSubscription(processingContext);
+ outputMsg = createOutputMessageContext(inMsg, processingContext);
+ break;
+ }
+ case GET_CURRENT_MSG:
+ default:
+ throw new AxisFault("not implemented yet");
+ }
+
+ return outputMsg;
+ }
+
+}// end of class
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
new file mode 100644
index 0000000..b517234
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wsnotification;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.airavata.wsmg.util.WsNotificationOperations;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSNotificationProcessingContextBuilder extends ProcessingContextBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(WSNotificationProcessingContextBuilder.class);
+
+ public ProcessingContext build(OMElement elem) {
+
+ ProcessingContext processingContext = new ProcessingContext();
+
+ if (elem == null
+ || (!elem.getNamespace().getNamespaceURI().equals(NameSpaceConstants.WSNT_NS.getNamespaceURI()))) {
+
+ logger.warn("invalid message payload recieved: " + elem);
+
+ return processingContext;
+ }
+
+ String localName = elem.getLocalName();
+
+ if (localName.equals("SubscribeRequest")) {
+ onSubscription(processingContext, elem);
+ }
+ return processingContext;
+ }
+
+ public void onSubscription(ProcessingContext context, OMElement subscribeElement) {
+ context.setContextParameter(ContextParameters.SUBSCRIBE_ELEMENT, subscribeElement);
+
+ OMElement consumerReference = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "ConsumerReference"));
+ if (consumerReference == null) {
+ logger.warn("unable to find consumer reference" + " in subscribe message: " + subscribeElement);
+ return;
+ }
+ context.setContextParameter(ContextParameters.NOTIFY_TO_ELEMENT, consumerReference);
+
+ try {
+ EndpointReference consumerEpr = EndpointReferenceHelper.fromOM(consumerReference);
+ context.setContextParameter(ContextParameters.NOTIFY_TO_EPR, consumerEpr);
+
+ } catch (AxisFault e) {
+ logger.warn("invalid epr", e);
+ return;
+ }
+
+ OMElement topicExpression = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "TopicExpression"));
+
+ if (topicExpression != null) { // topic can be null
+
+ context.setContextParameter(ContextParameters.TOPIC_EXPRESSION_ELEMENT, topicExpression);
+
+ }
+
+ OMElement useNotify = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "UseNotify"));
+
+ if (useNotify != null) {
+ context.setContextParameter(ContextParameters.USE_NOTIFY_ELEMENT, useNotify);
+ }
+
+ OMElement selector = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "Selector"));
+
+ if (selector != null) {
+ context.setContextParameter(ContextParameters.XPATH_ELEMENT, selector);
+ }
+
+ OMElement subscriptionPolicy = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), WsmgCommonConstants.SUBSCRIPTION_POLICY));
+
+ if (subscriptionPolicy != null) {
+ context.setContextParameter(ContextParameters.SUB_POLICY, subscriptionPolicy);
+ }
+
+ }
+
+ public ProcessingContext build(MessageContext msgContext, WsNotificationOperations operation) {
+ ProcessingContext context = new ProcessingContext();
+
+ SOAPEnvelope soapEnvelope = msgContext.getEnvelope();
+
+ if (soapEnvelope == null) {
+ throw new RuntimeException("invalid message context - envelope is not found");
+ }
+
+ SOAPBody soapBody = soapEnvelope.getBody();
+
+ if (soapBody == null) {
+ throw new RuntimeException("invalid message context - soap envelope is not found");
+ }
+
+ SOAPHeader soapHeader = soapEnvelope.getHeader();
+
+ if (soapHeader == null) {
+ throw new RuntimeException("invalid message context - soap header is not found");
+ }
+
+ switch (operation) {
+ case SUBSCRIBE: {
+
+ Iterator<OMElement> iterator = soapBody.getChildrenWithName(new QName(NameSpaceConstants.WSNT_NS
+ .getNamespaceURI(), "SubscribeRequest"));
+ if (!iterator.hasNext()) {
+ throw new RuntimeException("invalid message context - unable to find Subscribe information");
+ }
+
+ onSubscription(context, iterator.next());
+ }
+ break;
+ }
+
+ context.setEnvelope(soapEnvelope);
+ extractInfoFromHeader(context, soapHeader);
+ context.setMessageConext(msgContext);
+ String topicFromUrl = BrokerUtil.getTopicFromRequestPath(msgContext.getTo().getAddress());
+ context.setContextParameter(ContextParameters.TOPIC_FROM_URL, topicFromUrl);
+
+ return context;
+ }
+
+ private void extractInfoFromHeader(ProcessingContext context, SOAPHeader header) {
+
+ Iterator ite = header.getChildrenWithName(new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(),
+ WsmgCommonConstants.SUBSCRIPTION_ID));
+ if (ite.hasNext()) {
+ OMElement identifier = (OMElement) ite.next();
+ logger.debug("extracted identifier " + identifier.getText());
+
+ context.setContextParameter(ContextParameters.SUB_ID, identifier.getText());
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
new file mode 100644
index 0000000..73eba17
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+public class OutGoingMessage implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -6999667921413261492L;
+
+ String textMessage;
+
+ AdditionalMessageContent additionalMessageContent;
+
+ List<ConsumerInfo> consumerInfoList = null;
+
+ // ConsumerInfo consumerInfo=null;
+
+ /**
+ *
+ */
+ public OutGoingMessage() {
+ // super();
+ // TODO Auto-generated constructor stub
+ // consumerInfo=new ConsumerInfo();
+ }
+
+ /**
+ * @param textMessage
+ * @param additionalMessageContent
+ * @param consumerInfoList
+ */
+ public OutGoingMessage(String textMessage, AdditionalMessageContent additionalMessageContent,
+ List<ConsumerInfo> consumerInfoList) {
+ super();
+ // TODO Auto-generated constructor stub
+ this.textMessage = textMessage;
+ this.additionalMessageContent = additionalMessageContent;
+ this.consumerInfoList = consumerInfoList;
+ }
+
+ /**
+ * @param consumerInfo
+ * The consumerInfo to set.
+ */
+ public void addConsumerInfo(ConsumerInfo consumerInfo) {
+ // this.consumerInfo = consumerInfo;
+ if (consumerInfoList == null) {
+ consumerInfoList = new ArrayList<ConsumerInfo>();
+ }
+ consumerInfoList.add(consumerInfo);
+ }
+
+ /**
+ * @return Returns the textMessage.
+ */
+ public String getTextMessage() {
+ return textMessage;
+ }
+
+ /**
+ * @param textMessage
+ * The textMessage to set.
+ */
+ public void setTextMessage(String textMessage) {
+ this.textMessage = textMessage;
+ }
+
+ /**
+ * @return Returns the consumerInfoList.
+ */
+ public List<ConsumerInfo> getConsumerInfoList() {
+ return consumerInfoList;
+ }
+
+ /**
+ * @param consumerInfoList
+ * The consumerInfoList to set.
+ */
+ public void setConsumerInfoList(List<ConsumerInfo> consumerInfoList) {
+ this.consumerInfoList = consumerInfoList;
+ }
+
+ /**
+ * @return Returns the soapHeader.
+ */
+ public AdditionalMessageContent getAdditionalMessageContent() {
+ return additionalMessageContent;
+ }
+
+ /**
+ * @param soapHeader
+ * The soapHeader to set.
+ */
+ public void setAdditionalMessageContent(AdditionalMessageContent soapHeader) {
+ this.additionalMessageContent = soapHeader;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
new file mode 100644
index 0000000..6b051b1
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public class WsmgInMemoryStorage implements WsmgStorage, WsmgQueue {
+
+ private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+
+ private Map<String, SubscriptionState> expirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
+
+ private Map<String, SubscriptionState> unexpirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
+
+ public void dispose() {
+ queue.clear();
+ expirableSubscriptions.clear();
+ unexpirableSubscriptions.clear();
+ }
+
+ public int insert(SubscriptionState subscription) {
+ if (subscription.isNeverExpire()) {
+ unexpirableSubscriptions.put(subscription.getId(), subscription);
+ } else {
+ expirableSubscriptions.put(subscription.getId(), subscription);
+ }
+ return 0;
+ }
+
+ public int delete(String subscriptionId) {
+ expirableSubscriptions.remove(subscriptionId);
+ unexpirableSubscriptions.remove(subscriptionId);
+ return 0;
+ }
+
+ public List<SubscriptionEntry> getAllSubscription() {
+
+ List<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>(expirableSubscriptions.size()
+ + unexpirableSubscriptions.size());
+
+ Collection<SubscriptionState> entries = expirableSubscriptions.values();
+
+ for (SubscriptionState s : entries) {
+ SubscriptionEntry se = new SubscriptionEntry();
+ se.setSubscribeXml(s.getSubscribeXml());
+ se.setSubscriptionId(s.getId());
+ ret.add(se);
+ }
+ entries = unexpirableSubscriptions.values();
+ for (SubscriptionState s : entries) {
+ SubscriptionEntry se = new SubscriptionEntry();
+ se.setSubscribeXml(s.getSubscribeXml());
+ se.setSubscriptionId(s.getId());
+ ret.add(se);
+ }
+
+ return ret;
+ }
+
+ public Object blockingDequeue() {
+ Object obj = null;
+
+ try {
+ obj = queue.take();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("interruped exception occured", ie);
+ }
+
+ return obj;
+ }
+
+ public void cleanup() {
+ queue.clear();
+ }
+
+ public void enqueue(Object object, String trackId) {
+ queue.offer(object);
+ }
+}