You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2009/03/20 11:47:57 UTC
svn commit: r756411 - in /synapse/trunk/java: ./
modules/core/src/main/java/org/apache/synapse/config/xml/eventing/
modules/core/src/main/java/org/apache/synapse/eventing/
modules/core/src/main/java/org/apache/synapse/eventing/builders/
modules/core/sr...
Author: asanka
Date: Fri Mar 20 10:47:56 2009
New Revision: 756411
URL: http://svn.apache.org/viewvc?rev=756411&view=rev
Log:
Using the eventing core implementation in the eventing api (1.2) by removing the custom implementation done for synapse. (filter, epr removed)
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
synapse/trunk/java/pom.xml
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java Fri Mar 20 10:47:56 2009
@@ -156,25 +156,15 @@
OMElement elmFilter = elmSubscription.getFirstChildWithName(FILTER_QNAME);
OMAttribute dialectAttr = elmFilter.getAttribute(FILTER_DIALECT_QNAME);
if (dialectAttr != null && dialectAttr.getAttributeValue() != null) {
- if (SynapseEventingConstants.TOPIC_FILTER_DIALECT
- .equals(dialectAttr.getAttributeValue())) {
- XPathBasedEventFilter filter = new XPathBasedEventFilter();
+
OMAttribute sourceAttr = elmFilter.getAttribute(FILTER_SOURCE_QNAME);
if (sourceAttr != null) {
- filter.setResultValue(sourceAttr.getAttributeValue());
+ synapseSubscription.setFilterDialect(dialectAttr.getAttributeValue());
+ synapseSubscription.setFilterValue(elmFilter.getText());
} else {
handleException(
"Error in creating static subscription. Filter source not defined");
- }
- synapseSubscription.setFilter(filter);
- SubscriptionData subscriptionData = new SubscriptionData();
- subscriptionData.setProperty(SynapseEventingConstants.FILTER_VALUE,
- sourceAttr.getAttributeValue());
- subscriptionData.setProperty(SynapseEventingConstants.FILTER_DIALECT,
- dialectAttr.getAttributeValue());
- synapseSubscription.setSubscriptionData(subscriptionData);
- }
-
+ }
} else {
handleException(
"Error in creating static subscription. Filter dialect not defined");
@@ -183,13 +173,8 @@
if (elmEndpoint != null) {
OMElement elmAddress = elmEndpoint.getFirstChildWithName(ADDRESS_QNAME);
if (elmAddress != null) {
- AddressEndpoint endpoint = new AddressEndpoint();
- EndpointDefinition def = new EndpointDefinition();
OMAttribute uriAttr = elmAddress.getAttribute(EP_URI_QNAME);
if (uriAttr != null) {
- def.setAddress(uriAttr.getAttributeValue());
- endpoint.setDefinition(def);
- synapseSubscription.setEndpoint(endpoint);
synapseSubscription.setEndpointUrl(uriAttr.getAttributeValue());
synapseSubscription.setAddressUrl(uriAttr.getAttributeValue());
} else {
@@ -219,7 +204,8 @@
} else {
synapseSubscription.setExpires(null);
}
- synapseSubscription.setStaticEntry(true);
+ synapseSubscription.getSubscriptionData()
+ .setProperty(SynapseEventingConstants.STATIC_ENTRY, "true");
synapseEventSource.getSubscriptionManager().addSubscription(synapseSubscription);
}
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java Fri Mar 20 10:47:56 2009
@@ -86,11 +86,9 @@
OMElement filterElem =
fac.createOMElement("filter", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
filterElem.addAttribute(fac.createOMAttribute("source", nullNS,
- (String) staticSubscription.getSubscriptionData()
- .getProperty(SynapseEventingConstants.FILTER_VALUE)));
+ (String) staticSubscription.getFilterValue()));
filterElem.addAttribute(fac.createOMAttribute("dialect", nullNS,
- (String) staticSubscription.getSubscriptionData()
- .getProperty(SynapseEventingConstants.FILTER_DIALECT)));
+ (String) staticSubscription.getFilterDialect()));
staticSubElem.addChild(filterElem);
OMElement endpointElem =
fac.createOMElement("endpoint", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java Fri Mar 20 10:47:56 2009
@@ -20,6 +20,7 @@
package org.apache.synapse.eventing;
import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisOperation;
@@ -31,6 +32,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
@@ -185,7 +189,8 @@
synCtx.setProperty(SynapseConstants.OUT_ONLY,
"true"); // Set one way message for events
try {
- subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
+ getEndpointFromURL(subscription.getEndpointUrl())
+ .send(MessageHelper.cloneMessageContext(synCtx));
} catch (AxisFault axisFault) {
log.error("Event sending failure " + axisFault.toString());
}
@@ -365,8 +370,21 @@
SubscriptionMessageBuilder.getErrorCode(),
SubscriptionMessageBuilder.getErrorSubCode(),
SubscriptionMessageBuilder.getErrorReason(), "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc, true);
}
}
+
+ /**
+ * Create a Endpoint for a given URL
+ *
+ * @param endpointUrl
+ * @return AddressEndpoint
+ */
+ private Endpoint getEndpointFromURL(String endpointUrl) {
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition def = new EndpointDefinition();
+ def.setAddress(endpointUrl.trim());
+ endpoint.setDefinition(def);
+ return endpoint;
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java Fri Mar 20 10:47:56 2009
@@ -25,6 +25,5 @@
public class SynapseEventingConstants {
public static final String TOPIC_FILTER_DIALECT =
"http://synapse.apache.org/eventing/dialect/topicFilter";
- public static final String FILTER_VALUE = "filter";
- public static final String FILTER_DIALECT = "dialect";
+ public static final String STATIC_ENTRY = "staticEntry";
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java Fri Mar 20 10:47:56 2009
@@ -23,57 +23,24 @@
import org.apache.synapse.util.UUIDGenerator;
import org.wso2.eventing.EventingConstants;
import org.wso2.eventing.Subscription;
+import org.wso2.eventing.SubscriptionData;
/**
* Bean that keep subscription and subscription metadata.
*/
public class SynapseSubscription extends Subscription {
- private SynapseEventFilter filter;
- private Endpoint endpoint;
- private boolean staticEntry;
- private String subManagerURI;
public SynapseSubscription() {
this.setId(UUIDGenerator.getUUID());
this.setDeliveryMode(EventingConstants.WSE_DEFAULT_DELIVERY_MODE);
- this.setStaticEntry(false);
+ SubscriptionData subscriptionData = new SubscriptionData();
+ subscriptionData.setProperty(SynapseEventingConstants.STATIC_ENTRY, "false");
+ this.setSubscriptionData(subscriptionData);
}
public SynapseSubscription(String deliveryMode) {
this.setId(UUIDGenerator.getUUID());
this.setDeliveryMode(deliveryMode);
}
-
- public SynapseEventFilter getSynapseFilter() {
- return filter;
- }
-
- public void setFilter(SynapseEventFilter filter) {
- this.filter = filter;
- }
-
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- public void setEndpoint(Endpoint endpoint) {
- this.endpoint = endpoint;
- }
-
- public boolean isStaticEntry() {
- return staticEntry;
- }
-
- public void setStaticEntry(boolean staticEntry) {
- this.staticEntry = staticEntry;
- }
-
- public String getSubManagerURI() {
- return subManagerURI;
- }
-
- public void setSubManagerURI(String subManagerURI) {
- this.subManagerURI = subManagerURI;
- }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java Fri Mar 20 10:47:56 2009
@@ -85,7 +85,7 @@
public SOAPEnvelope genSubscriptionResponse(SynapseSubscription subscription) {
SOAPEnvelope message = factory.getDefaultEnvelope();
EndpointReference subscriptionManagerEPR =
- new EndpointReference(subscription.getSubManagerURI());
+ new EndpointReference(subscription.getSubManUrl());
subscriptionManagerEPR.addReferenceParameter(new QName(EventingConstants.WSE_EVENTING_NS,
EventingConstants.WSE_EN_IDENTIFIER, EventingConstants.WSE_EVENTING_PREFIX),
subscription.getId());
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java Fri Mar 20 10:47:56 2009
@@ -21,23 +21,17 @@
import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
+import org.apache.axis2.context.MessageContext;
import org.apache.axis2.databinding.utils.ConverterUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.context.MessageContext;
import org.apache.synapse.SynapseException;
-import org.apache.synapse.config.xml.SynapseXPathFactory;
import org.apache.synapse.config.xml.XMLConfigConstants;
import org.apache.synapse.endpoints.AddressEndpoint;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.endpoints.EndpointDefinition;
-import org.apache.synapse.eventing.SynapseEventingConstants;
import org.apache.synapse.eventing.SynapseSubscription;
-import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
-import org.apache.synapse.util.xpath.SynapseXPath;
-import org.jaxen.JaxenException;
import org.wso2.eventing.EventingConstants;
-import org.wso2.eventing.SubscriptionData;
import javax.xml.namespace.QName;
import java.util.Calendar;
@@ -59,8 +53,6 @@
new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_NOTIFY_TO);
private static final QName ATT_DIALECT =
new QName(XMLConfigConstants.NULL_NAMESPACE, EventingConstants.WSE_EN_DIALECT);
- private static final QName ATT_XPATH =
- new QName(XMLConfigConstants.NULL_NAMESPACE, EventingConstants.WSE_EN_XPATH);
private static final QName IDENTIFIER =
new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_IDENTIFIER);
private static final QName EXPIRES =
@@ -134,15 +126,12 @@
if (deliveryElem != null) {
notifyToElem = deliveryElem.getFirstChildWithName(NOTIFY_TO_QNAME);
if (notifyToElem != null) {
- Endpoint ep = getEndpointFromWSAAddress(notifyToElem.getFirstElement());
- if (ep != null) {
- subscription = new SynapseSubscription(
- EventingConstants.WSE_DEFAULT_DELIVERY_MODE);
- subscription.setEndpoint(ep);
- subscription.setAddressUrl(notifyToElem.getFirstElement().getText());
- subscription.setEndpointUrl(notifyToElem.getFirstElement().getText());
- subscription.setSubManagerURI(mc.getTo().getAddress());
- }
+ subscription = new SynapseSubscription(
+ EventingConstants.WSE_DEFAULT_DELIVERY_MODE);
+ subscription.setAddressUrl(notifyToElem.getFirstElement().getText());
+ subscription.setEndpointUrl(notifyToElem.getFirstElement().getText());
+ subscription.setSubManUrl(mc.getTo().getAddress());
+
} else {
handleException("NotifyTo element not found in the subscription message");
}
@@ -154,27 +143,8 @@
if (subscription != null && filterElem != null) {
OMAttribute dialectAttr = filterElem.getAttribute(ATT_DIALECT);
if (dialectAttr != null && dialectAttr.getAttributeValue() != null) {
- if (SynapseEventingConstants.TOPIC_FILTER_DIALECT
- .equals(dialectAttr.getAttributeValue())) {
- XPathBasedEventFilter filter = new XPathBasedEventFilter();
- filter.setResultValue(filterElem.getText());
- if (filterElem.getAttribute(ATT_XPATH) != null) {
- try {
- SynapseXPath xpath =
- SynapseXPathFactory.getSynapseXPath(filterElem, ATT_XPATH);
- filter.setSourceXpath(xpath);
- } catch (JaxenException e) {
- handleException("Unable to create the SynapseEventFilter xpath", e);
- }
- }
- subscription.setFilter(filter);
- SubscriptionData subscriptionData = new SubscriptionData();
- subscriptionData.setProperty(SynapseEventingConstants.FILTER_VALUE,
- filterElem.getText());
- subscriptionData.setProperty(SynapseEventingConstants.FILTER_DIALECT,
- dialectAttr.getAttributeValue());
- subscription.setSubscriptionData(subscriptionData);
- }
+ subscription.setFilterDialect(dialectAttr.getAttributeValue());
+ subscription.setFilterValue(filterElem.getText());
} else {
handleException("Error in creating subscription. Filter dialect not defined");
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java Fri Mar 20 10:47:56 2009
@@ -25,6 +25,7 @@
import org.apache.synapse.SynapseException;
import org.apache.synapse.eventing.SynapseSubscription;
import org.apache.synapse.eventing.SynapseSubscriptionManager;
+import org.apache.synapse.eventing.SynapseEventingConstants;
import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
import org.apache.synapse.util.xpath.SynapseXPath;
import org.jaxen.JaxenException;
@@ -93,9 +94,10 @@
public List<SynapseSubscription> getMatchingSubscribers(MessageContext mc) {
final LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
- XPathBasedEventFilter filter =
- (XPathBasedEventFilter) stringSubscriptionEntry.getValue().getSynapseFilter();
+ //TODO : pick the filter based on the dialect
+ XPathBasedEventFilter filter = new XPathBasedEventFilter();
if (filter != null) {
+ filter.setResultValue(stringSubscriptionEntry.getValue().getFilterValue());
filter.setSourceXpath(topicXPath);
}
if (filter == null || filter.isSatisfied(mc)) {
@@ -119,7 +121,8 @@
public List<SynapseSubscription> getStaticSubscribers() {
LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
- if (stringSubscriptionEntry.getValue().isStaticEntry()) {
+ if ((stringSubscriptionEntry.getValue().getSubscriptionData().getProperty(
+ SynapseEventingConstants.STATIC_ENTRY)).equals("true")) {
list.add(stringSubscriptionEntry.getValue());
}
}
Modified: synapse/trunk/java/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/pom.xml?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/pom.xml (original)
+++ synapse/trunk/java/pom.xml Fri Mar 20 10:47:56 2009
@@ -944,7 +944,7 @@
<wso2commons.version>1.2</wso2commons.version>
<wso2caching.version>1.6.1</wso2caching.version>
<wso2throttle.version>1.6</wso2throttle.version>
- <wso2eventing-api.version>1.1</wso2eventing-api.version>
+ <wso2eventing-api.version>1.2-SNAPSHOT</wso2eventing-api.version>
<xbean.version>2.2.0</xbean.version>
<bsf.version>3.0-beta2</bsf.version>
<groovy.version>1.1-rc-1</groovy.version>