You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2009/03/20 11:47:57 UTC

svn commit: r756411 - in /synapse/trunk/java: ./ modules/core/src/main/java/org/apache/synapse/config/xml/eventing/ modules/core/src/main/java/org/apache/synapse/eventing/ modules/core/src/main/java/org/apache/synapse/eventing/builders/ modules/core/sr...

Author: asanka
Date: Fri Mar 20 10:47:56 2009
New Revision: 756411

URL: http://svn.apache.org/viewvc?rev=756411&view=rev
Log:
Using the eventing core implementation in the eventing api (1.2) by removing the custom implementation done for synapse. (filter, epr removed)

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
    synapse/trunk/java/pom.xml

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java Fri Mar 20 10:47:56 2009
@@ -156,25 +156,15 @@
             OMElement elmFilter = elmSubscription.getFirstChildWithName(FILTER_QNAME);
             OMAttribute dialectAttr = elmFilter.getAttribute(FILTER_DIALECT_QNAME);
             if (dialectAttr != null && dialectAttr.getAttributeValue() != null) {
-                if (SynapseEventingConstants.TOPIC_FILTER_DIALECT
-                        .equals(dialectAttr.getAttributeValue())) {
-                    XPathBasedEventFilter filter = new XPathBasedEventFilter();
+
                     OMAttribute sourceAttr = elmFilter.getAttribute(FILTER_SOURCE_QNAME);
                     if (sourceAttr != null) {
-                        filter.setResultValue(sourceAttr.getAttributeValue());
+                        synapseSubscription.setFilterDialect(dialectAttr.getAttributeValue());
+                        synapseSubscription.setFilterValue(elmFilter.getText());
                     } else {
                         handleException(
                                 "Error in creating static subscription. Filter source not defined");
-                    }
-                    synapseSubscription.setFilter(filter);
-                    SubscriptionData subscriptionData = new SubscriptionData();
-                    subscriptionData.setProperty(SynapseEventingConstants.FILTER_VALUE,
-                            sourceAttr.getAttributeValue());
-                    subscriptionData.setProperty(SynapseEventingConstants.FILTER_DIALECT,
-                            dialectAttr.getAttributeValue());
-                    synapseSubscription.setSubscriptionData(subscriptionData);
-                }
-
+                    }             
             } else {
                 handleException(
                         "Error in creating static subscription. Filter dialect not defined");
@@ -183,13 +173,8 @@
             if (elmEndpoint != null) {
                 OMElement elmAddress = elmEndpoint.getFirstChildWithName(ADDRESS_QNAME);
                 if (elmAddress != null) {
-                    AddressEndpoint endpoint = new AddressEndpoint();
-                    EndpointDefinition def = new EndpointDefinition();
                     OMAttribute uriAttr = elmAddress.getAttribute(EP_URI_QNAME);
                     if (uriAttr != null) {
-                        def.setAddress(uriAttr.getAttributeValue());
-                        endpoint.setDefinition(def);
-                        synapseSubscription.setEndpoint(endpoint);
                         synapseSubscription.setEndpointUrl(uriAttr.getAttributeValue());
                         synapseSubscription.setAddressUrl(uriAttr.getAttributeValue());
                     } else {
@@ -219,7 +204,8 @@
             } else {
                 synapseSubscription.setExpires(null);
             }
-            synapseSubscription.setStaticEntry(true);
+            synapseSubscription.getSubscriptionData()
+                    .setProperty(SynapseEventingConstants.STATIC_ENTRY, "true");
             synapseEventSource.getSubscriptionManager().addSubscription(synapseSubscription);
         }
     }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java Fri Mar 20 10:47:56 2009
@@ -86,11 +86,9 @@
                 OMElement filterElem =
                         fac.createOMElement("filter", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
                 filterElem.addAttribute(fac.createOMAttribute("source", nullNS,
-                        (String) staticSubscription.getSubscriptionData()
-                                .getProperty(SynapseEventingConstants.FILTER_VALUE)));
+                        (String) staticSubscription.getFilterValue()));
                 filterElem.addAttribute(fac.createOMAttribute("dialect", nullNS,
-                        (String) staticSubscription.getSubscriptionData()
-                                .getProperty(SynapseEventingConstants.FILTER_DIALECT)));
+                        (String) staticSubscription.getFilterDialect()));
                 staticSubElem.addChild(filterElem);
                 OMElement endpointElem =
                         fac.createOMElement("endpoint", XMLConfigConstants.SYNAPSE_OMNAMESPACE);

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java Fri Mar 20 10:47:56 2009
@@ -20,6 +20,7 @@
 package org.apache.synapse.eventing;
 
 import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisOperation;
@@ -31,6 +32,9 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.EndpointDefinition;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
@@ -185,7 +189,8 @@
                 synCtx.setProperty(SynapseConstants.OUT_ONLY,
                         "true");    // Set one way message for events
                 try {
-                    subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
+                    getEndpointFromURL(subscription.getEndpointUrl())
+                            .send(MessageHelper.cloneMessageContext(synCtx));
                 } catch (AxisFault axisFault) {
                     log.error("Event sending failure " + axisFault.toString());
                 }
@@ -365,8 +370,21 @@
                     SubscriptionMessageBuilder.getErrorCode(),
                     SubscriptionMessageBuilder.getErrorSubCode(),
                     SubscriptionMessageBuilder.getErrorReason(), "");
-            dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                    true);
+            dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc, true);
         }
     }
+
+    /**
+     * Create a Endpoint for a given URL
+     *
+     * @param endpointUrl
+     * @return AddressEndpoint
+     */
+    private Endpoint getEndpointFromURL(String endpointUrl) {
+        AddressEndpoint endpoint = new AddressEndpoint();
+        EndpointDefinition def = new EndpointDefinition();
+        def.setAddress(endpointUrl.trim());
+        endpoint.setDefinition(def);
+        return endpoint;
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventingConstants.java Fri Mar 20 10:47:56 2009
@@ -25,6 +25,5 @@
 public class SynapseEventingConstants {
     public static final String TOPIC_FILTER_DIALECT =
             "http://synapse.apache.org/eventing/dialect/topicFilter";
-    public static final String FILTER_VALUE = "filter";
-    public static final String FILTER_DIALECT = "dialect";
+    public static final String STATIC_ENTRY = "staticEntry";    
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscription.java Fri Mar 20 10:47:56 2009
@@ -23,57 +23,24 @@
 import org.apache.synapse.util.UUIDGenerator;
 import org.wso2.eventing.EventingConstants;
 import org.wso2.eventing.Subscription;
+import org.wso2.eventing.SubscriptionData;
 
 /**
  * Bean that keep subscription and subscription metadata.
  */
 public class SynapseSubscription extends Subscription {
 
-    private SynapseEventFilter filter;
-    private Endpoint endpoint;
-    private boolean staticEntry;
-    private String subManagerURI;
 
     public SynapseSubscription() {
         this.setId(UUIDGenerator.getUUID());
         this.setDeliveryMode(EventingConstants.WSE_DEFAULT_DELIVERY_MODE);
-        this.setStaticEntry(false);
+        SubscriptionData subscriptionData = new SubscriptionData();
+        subscriptionData.setProperty(SynapseEventingConstants.STATIC_ENTRY, "false");
+        this.setSubscriptionData(subscriptionData);
     }
 
     public SynapseSubscription(String deliveryMode) {
         this.setId(UUIDGenerator.getUUID());
         this.setDeliveryMode(deliveryMode);
     }
-
-    public SynapseEventFilter getSynapseFilter() {
-        return filter;
-    }
-
-    public void setFilter(SynapseEventFilter filter) {
-        this.filter = filter;
-    }
-
-    public Endpoint getEndpoint() {
-        return endpoint;
-    }
-
-    public void setEndpoint(Endpoint endpoint) {
-        this.endpoint = endpoint;
-    }
-
-    public boolean isStaticEntry() {
-        return staticEntry;
-    }
-
-    public void setStaticEntry(boolean staticEntry) {
-        this.staticEntry = staticEntry;
-    }
-
-    public String getSubManagerURI() {
-        return subManagerURI;
-    }
-
-    public void setSubManagerURI(String subManagerURI) {
-        this.subManagerURI = subManagerURI;
-    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/ResponseMessageBuilder.java Fri Mar 20 10:47:56 2009
@@ -85,7 +85,7 @@
     public SOAPEnvelope genSubscriptionResponse(SynapseSubscription subscription) {
         SOAPEnvelope message = factory.getDefaultEnvelope();
         EndpointReference subscriptionManagerEPR =
-                new EndpointReference(subscription.getSubManagerURI());
+                new EndpointReference(subscription.getSubManUrl());
         subscriptionManagerEPR.addReferenceParameter(new QName(EventingConstants.WSE_EVENTING_NS,
                 EventingConstants.WSE_EN_IDENTIFIER, EventingConstants.WSE_EVENTING_PREFIX),
                 subscription.getId());

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/builders/SubscriptionMessageBuilder.java Fri Mar 20 10:47:56 2009
@@ -21,23 +21,17 @@
 
 import org.apache.axiom.om.OMAttribute;
 import org.apache.axiom.om.OMElement;
+import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.databinding.utils.ConverterUtil;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.axis2.context.MessageContext;
 import org.apache.synapse.SynapseException;
-import org.apache.synapse.config.xml.SynapseXPathFactory;
 import org.apache.synapse.config.xml.XMLConfigConstants;
 import org.apache.synapse.endpoints.AddressEndpoint;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.endpoints.EndpointDefinition;
-import org.apache.synapse.eventing.SynapseEventingConstants;
 import org.apache.synapse.eventing.SynapseSubscription;
-import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
-import org.apache.synapse.util.xpath.SynapseXPath;
-import org.jaxen.JaxenException;
 import org.wso2.eventing.EventingConstants;
-import org.wso2.eventing.SubscriptionData;
 
 import javax.xml.namespace.QName;
 import java.util.Calendar;
@@ -59,8 +53,6 @@
             new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_NOTIFY_TO);
     private static final QName ATT_DIALECT =
             new QName(XMLConfigConstants.NULL_NAMESPACE, EventingConstants.WSE_EN_DIALECT);
-    private static final QName ATT_XPATH =
-            new QName(XMLConfigConstants.NULL_NAMESPACE, EventingConstants.WSE_EN_XPATH);
     private static final QName IDENTIFIER =
             new QName(EventingConstants.WSE_EVENTING_NS, EventingConstants.WSE_EN_IDENTIFIER);
     private static final QName EXPIRES =
@@ -134,15 +126,12 @@
             if (deliveryElem != null) {
                 notifyToElem = deliveryElem.getFirstChildWithName(NOTIFY_TO_QNAME);
                 if (notifyToElem != null) {
-                    Endpoint ep = getEndpointFromWSAAddress(notifyToElem.getFirstElement());
-                    if (ep != null) {
-                        subscription = new SynapseSubscription(
-                                EventingConstants.WSE_DEFAULT_DELIVERY_MODE);
-                        subscription.setEndpoint(ep);
-                        subscription.setAddressUrl(notifyToElem.getFirstElement().getText());
-                        subscription.setEndpointUrl(notifyToElem.getFirstElement().getText());
-                        subscription.setSubManagerURI(mc.getTo().getAddress());
-                    }
+                    subscription = new SynapseSubscription(
+                            EventingConstants.WSE_DEFAULT_DELIVERY_MODE);
+                    subscription.setAddressUrl(notifyToElem.getFirstElement().getText());
+                    subscription.setEndpointUrl(notifyToElem.getFirstElement().getText());
+                    subscription.setSubManUrl(mc.getTo().getAddress());
+
                 } else {
                     handleException("NotifyTo element not found in the subscription message");
                 }
@@ -154,27 +143,8 @@
             if (subscription != null && filterElem != null) {
                 OMAttribute dialectAttr = filterElem.getAttribute(ATT_DIALECT);
                 if (dialectAttr != null && dialectAttr.getAttributeValue() != null) {
-                    if (SynapseEventingConstants.TOPIC_FILTER_DIALECT
-                            .equals(dialectAttr.getAttributeValue())) {
-                        XPathBasedEventFilter filter = new XPathBasedEventFilter();
-                        filter.setResultValue(filterElem.getText());
-                        if (filterElem.getAttribute(ATT_XPATH) != null) {
-                            try {
-                                SynapseXPath xpath =
-                                        SynapseXPathFactory.getSynapseXPath(filterElem, ATT_XPATH);
-                                filter.setSourceXpath(xpath);
-                            } catch (JaxenException e) {
-                                handleException("Unable to create the SynapseEventFilter xpath", e);
-                            }
-                        }
-                        subscription.setFilter(filter);
-                        SubscriptionData subscriptionData = new SubscriptionData();
-                        subscriptionData.setProperty(SynapseEventingConstants.FILTER_VALUE,
-                                filterElem.getText());
-                        subscriptionData.setProperty(SynapseEventingConstants.FILTER_DIALECT,
-                                dialectAttr.getAttributeValue());
-                        subscription.setSubscriptionData(subscriptionData);
-                    }
+                    subscription.setFilterDialect(dialectAttr.getAttributeValue());
+                    subscription.setFilterValue(filterElem.getText());
                 } else {
                     handleException("Error in creating subscription. Filter dialect not defined");
                 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java Fri Mar 20 10:47:56 2009
@@ -25,6 +25,7 @@
 import org.apache.synapse.SynapseException;
 import org.apache.synapse.eventing.SynapseSubscription;
 import org.apache.synapse.eventing.SynapseSubscriptionManager;
+import org.apache.synapse.eventing.SynapseEventingConstants;
 import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
 import org.apache.synapse.util.xpath.SynapseXPath;
 import org.jaxen.JaxenException;
@@ -93,9 +94,10 @@
     public List<SynapseSubscription> getMatchingSubscribers(MessageContext mc) {
         final LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
         for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
-            XPathBasedEventFilter filter =
-                    (XPathBasedEventFilter) stringSubscriptionEntry.getValue().getSynapseFilter();
+            //TODO : pick the filter based on the dialect
+            XPathBasedEventFilter filter = new XPathBasedEventFilter();
             if (filter != null) {
+                filter.setResultValue(stringSubscriptionEntry.getValue().getFilterValue());
                 filter.setSourceXpath(topicXPath);
             }
             if (filter == null || filter.isSatisfied(mc)) {
@@ -119,7 +121,8 @@
     public List<SynapseSubscription> getStaticSubscribers() {
         LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
         for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
-            if (stringSubscriptionEntry.getValue().isStaticEntry()) {
+            if ((stringSubscriptionEntry.getValue().getSubscriptionData().getProperty(
+                    SynapseEventingConstants.STATIC_ENTRY)).equals("true")) {
                 list.add(stringSubscriptionEntry.getValue());
             }
         }

Modified: synapse/trunk/java/pom.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/pom.xml?rev=756411&r1=756410&r2=756411&view=diff
==============================================================================
--- synapse/trunk/java/pom.xml (original)
+++ synapse/trunk/java/pom.xml Fri Mar 20 10:47:56 2009
@@ -944,7 +944,7 @@
         <wso2commons.version>1.2</wso2commons.version>
         <wso2caching.version>1.6.1</wso2caching.version>
         <wso2throttle.version>1.6</wso2throttle.version>
-        <wso2eventing-api.version>1.1</wso2eventing-api.version>
+        <wso2eventing-api.version>1.2-SNAPSHOT</wso2eventing-api.version>
         <xbean.version>2.2.0</xbean.version>
         <bsf.version>3.0-beta2</bsf.version>
         <groovy.version>1.1-rc-1</groovy.version>