You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2009/03/30 14:28:30 UTC

svn commit: r759941 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse: config/xml/eventing/ eventing/ eventing/managers/

Author: asanka
Date: Mon Mar 30 12:28:30 2009
New Revision: 759941

URL: http://svn.apache.org/viewvc?rev=759941&view=rev
Log:
Removed the duplicated methods from the SynapseSubscription manager, matching subscriptions operation takes Event<T> as an input param.  

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java Mon Mar 30 12:28:30 2009
@@ -35,6 +35,7 @@
 import org.apache.synapse.eventing.SynapseSubscriptionManager;
 import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
 import org.wso2.eventing.SubscriptionData;
+import org.wso2.eventing.exceptions.EventException;
 
 import javax.xml.namespace.QName;
 import java.util.Iterator;
@@ -125,7 +126,13 @@
             handleException(
                     "SynapseSubscription Manager has not been specified for the event source");
         }
-        createStaticSubscriptions(elem, eventSource);
+
+        try {
+            createStaticSubscriptions(elem, eventSource);
+        } catch (EventException e) {
+            handleException("Static subscription creation failure",e);
+        }
+
         return eventSource;
     }
 
@@ -146,7 +153,8 @@
      * @param synapseEventSource
      */
     private static void createStaticSubscriptions(OMElement elem,
-                                                  SynapseEventSource synapseEventSource) {
+                                                  SynapseEventSource synapseEventSource)
+            throws EventException {
         for (Iterator iterator = elem.getChildrenWithName(SUBSCRIPTION_QNAME);
              iterator.hasNext();) {
             SynapseSubscription synapseSubscription = new SynapseSubscription();
@@ -206,7 +214,7 @@
             }
             synapseSubscription.getSubscriptionData()
                     .setProperty(SynapseEventingConstants.STATIC_ENTRY, "true");
-            synapseEventSource.getSubscriptionManager().addSubscription(synapseSubscription);
+                synapseEventSource.getSubscriptionManager().subscribe(synapseSubscription);
         }
     }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java Mon Mar 30 12:28:30 2009
@@ -28,6 +28,7 @@
 import org.apache.synapse.eventing.SynapseEventSource;
 import org.apache.synapse.eventing.SynapseEventingConstants;
 import org.apache.synapse.eventing.SynapseSubscription;
+import org.wso2.eventing.Subscription;
 
 import java.util.Iterator;
 import java.util.List;
@@ -74,11 +75,11 @@
             }
             evenSourceElem.addChild(subManagerElem);
             // Adding static subscriptions
-            List<SynapseSubscription> staticSubscriptionList =
+            List<Subscription> staticSubscriptionList =
                     eventSource.getSubscriptionManager().getStaticSubscribers();
-            for (Iterator<SynapseSubscription> iterator = staticSubscriptionList.iterator();
+            for (Iterator<Subscription> iterator = staticSubscriptionList.iterator();
                  iterator.hasNext();) {
-                SynapseSubscription staticSubscription = iterator.next();
+                Subscription staticSubscription = iterator.next();
                 OMElement staticSubElem =
                         fac.createOMElement("subscription", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
                 staticSubElem.addAttribute(

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java Mon Mar 30 12:28:30 2009
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.endpoints.AddressEndpoint;
 import org.apache.synapse.endpoints.EndpointDefinition;
@@ -43,6 +44,9 @@
 import org.apache.synapse.eventing.builders.SubscriptionMessageBuilder;
 import org.apache.synapse.util.MessageHelper;
 import org.wso2.eventing.EventingConstants;
+import org.wso2.eventing.Subscription;
+import org.wso2.eventing.Event;
+import org.wso2.eventing.exceptions.EventException;
 
 import javax.xml.namespace.QName;
 import java.util.List;
@@ -110,25 +114,28 @@
         org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
         // initialisze the response message builder using the message context
         ResponseMessageBuilder messageBuilder = new ResponseMessageBuilder(mc);
-
-        if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
-            // add new subscription to the SynapseSubscription store through subscription manager
-            processSubscriptionRequest(mc, messageBuilder);
-        } else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
-            // Unsubscribe the matching subscription
-            processUnSubscribeRequest(mc, messageBuilder);
-        } else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
-            // Responce with the status of the subscription
-            processGetStatusRequest(mc, messageBuilder);
-        } else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
-            // Renew subscription
-            processReNewRequest(mc, messageBuilder);
-        } else {
-            // Treat as an Event
-            if (log.isDebugEnabled()) {
-                log.debug("Event recived");
+        try {
+            if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
+                // add new subscription to the SynapseSubscription store through subscription manager
+                processSubscriptionRequest(mc, messageBuilder);
+            } else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
+                // Unsubscribe the matching subscription
+                processUnSubscribeRequest(mc, messageBuilder);
+            } else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
+                // Responce with the status of the subscription
+                processGetStatusRequest(mc, messageBuilder);
+            } else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
+                // Renew subscription
+                processReNewRequest(mc, messageBuilder);
+            } else {
+                // Treat as an Event
+                if (log.isDebugEnabled()) {
+                    log.debug("Event recived");
+                }
+                dispatchEvents(smc);
             }
-            dispatchEvents(smc);
+        } catch (EventException e) {
+           handleException("Subscription manager processing error",e);
         }
     }
 
@@ -164,8 +171,15 @@
      * @param msgCtx message context
      */
     public void dispatchEvents(org.apache.synapse.MessageContext msgCtx) {
+        Event<org.apache.synapse.MessageContext> event = new Event(msgCtx);
+
+        List<Subscription> subscribers = null;
+        try {
+            subscribers = subscriptionManager.getMatchingSubscriptions(event);
+        } catch (EventException e) {
+            handleException("Matching subscriptions fetching error",e);
+        }
 
-        List<SynapseSubscription> subscribers = subscriptionManager.getMatchingSubscribers(msgCtx);
         // Call event dispatcher
         msgCtx.getEnvironment().getExecutorService()
                 .execute(new EventDispatcher(msgCtx, subscribers));
@@ -176,16 +190,16 @@
      */
     class EventDispatcher implements Runnable {
         private org.apache.synapse.MessageContext synCtx;
-        private List<SynapseSubscription> subscribers;
+        private List<Subscription> subscribers;
 
         EventDispatcher(org.apache.synapse.MessageContext synCtx,
-                        List<SynapseSubscription> subscribers) {
+                        List<Subscription> subscribers) {
             this.synCtx = synCtx;
             this.subscribers = subscribers;
         }
 
         public void run() {
-            for (SynapseSubscription subscription : subscribers) {
+            for (Subscription subscription : subscribers) {
                 synCtx.setProperty(SynapseConstants.OUT_ONLY,
                         "true");    // Set one way message for events
                 try {
@@ -210,13 +224,13 @@
      */
     private void processSubscriptionRequest(MessageContext mc,
                                             ResponseMessageBuilder messageBuilder)
-            throws AxisFault {
+            throws AxisFault, EventException {
         SynapseSubscription subscription = SubscriptionMessageBuilder.createSubscription(mc);
         if (log.isDebugEnabled()) {
             log.debug("SynapseSubscription request recived  : " + subscription.getId());
         }
         if (subscription.getId() != null) {
-            String subID = subscriptionManager.addSubscription(subscription);
+            String subID = subscriptionManager.subscribe(subscription);
             if (subID != null) {
                 // Send the subscription responce
                 if (log.isDebugEnabled()) {
@@ -260,14 +274,15 @@
      * @throws AxisFault
      */
     private void processUnSubscribeRequest(MessageContext mc,
-                                           ResponseMessageBuilder messageBuilder) throws AxisFault {
+                                           ResponseMessageBuilder messageBuilder)
+            throws AxisFault, EventException {
         SynapseSubscription subscription =
                 SubscriptionMessageBuilder.createUnSubscribeMessage(mc);
         if (log.isDebugEnabled()) {
             log.debug("UnSubscribe response recived for SynapseSubscription ID : " +
                     subscription.getId());
         }
-        if (subscriptionManager.deleteSubscription(subscription.getId())) {
+        if (subscriptionManager.unsubscribe(subscription.getId())) {
             //send the response
             if (log.isDebugEnabled()) {
                 log.debug("Sending UnSubscribe responce for SynapseSubscription ID : " +
@@ -297,14 +312,16 @@
      * @throws AxisFault
      */
     private void processGetStatusRequest(MessageContext mc,
-                                         ResponseMessageBuilder messageBuilder) throws AxisFault {
+                                         ResponseMessageBuilder messageBuilder)
+            throws AxisFault, EventException {
         SynapseSubscription subscription =
                 SubscriptionMessageBuilder.createGetStatusMessage(mc);
         if (log.isDebugEnabled()) {
             log.debug("GetStatus request recived for SynapseSubscription ID : " +
                     subscription.getId());
         }
-        subscription = subscriptionManager.getSubscription(subscription.getId());
+        subscription =
+                (SynapseSubscription) subscriptionManager.getSubscription(subscription.getId());
         if (subscription != null) {
             if (log.isDebugEnabled()) {
                 log.debug("Sending GetStatus responce for SynapseSubscription ID : " +
@@ -335,7 +352,8 @@
      * @throws AxisFault
      */
     private void processReNewRequest(MessageContext mc,
-                                     ResponseMessageBuilder messageBuilder) throws AxisFault {
+                                     ResponseMessageBuilder messageBuilder)
+            throws AxisFault, EventException {
         SynapseSubscription subscription =
                 SubscriptionMessageBuilder.createRenewSubscribeMessage(mc);
         if (log.isDebugEnabled()) {
@@ -344,7 +362,7 @@
         }
         String subID = subscription.getId();
         if (subID != null) {
-            if (subscriptionManager.renewSubscription(subscription)) {
+            if (subscriptionManager.renew(subscription)) {
                 //send the response
                 if (log.isDebugEnabled()) {
                     log.debug("Sending ReNew response for SynapseSubscription ID : " +
@@ -387,4 +405,9 @@
         endpoint.setDefinition(def);
         return endpoint;
     }
+
+    private void handleException(String message, Exception e) {
+        log.error(message, e);
+        throw new SynapseException(message, e);
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java Mon Mar 30 12:28:30 2009
@@ -21,6 +21,7 @@
 
 import org.apache.synapse.MessageContext;
 import org.wso2.eventing.SubscriptionManager;
+import org.wso2.eventing.Subscription;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -30,65 +31,17 @@
 /**
  * Subscription Manager for Synapse
  */
-public abstract class SynapseSubscriptionManager implements SubscriptionManager {
+public abstract class SynapseSubscriptionManager implements SubscriptionManager<MessageContext> {
 
     private final Map<String, String> properties = new HashMap<String, String>();
 
     /**
-     * Return all Active subscriptions
-     *
-     * @return List of subscriptions
-     */
-    public abstract List<SynapseSubscription> getSynapseSubscribers();
-
-    /**
-     * Get the matching subscriptions for a given filter.
-     *
-     * @param mc Message context
-     * @return List of subscriptions
-     */
-    public abstract List<SynapseSubscription> getMatchingSubscribers(MessageContext mc);
-
-    /**
      * Get the static subscription defined in the configuration
      *
      * @return List of static subscriptions
      */
-    public abstract List<SynapseSubscription> getStaticSubscribers();
+    public abstract List<Subscription> getStaticSubscribers();
 
-    /**
-     * Get a subscription by subscription ID
-     *
-     * @param id subscription ID
-     * @return SynapseSubscription
-     */
-    public abstract SynapseSubscription getSubscription(String id);
-
-    /**
-     * Add a new subscription to the store
-     *
-     * @param subs Subscription object
-     * @return String subscription ID
-     */
-    public abstract String addSubscription(SynapseSubscription subs);
-
-    /**
-     * Delete a given subscription
-     *
-     * @param id Subscription ID
-     * @return True|False
-     */
-    public abstract boolean deleteSubscription(String id);
-
-    /**
-     * Renew a given subscription
-     *
-     * @param subscription subscription object
-     * @return True|False
-     */
-    public abstract boolean renewSubscription(SynapseSubscription subscription);
-
-    public abstract void init();
 
     public void addProperty(String name, String value) {
         properties.put(name, value);
@@ -100,5 +53,5 @@
 
     public String getPropertyValue(String name) {
         return properties.get(name);
-    }       
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java Mon Mar 30 12:28:30 2009
@@ -32,6 +32,7 @@
 import org.jaxen.JaxenException;
 import org.wso2.eventing.Subscription;
 import org.wso2.eventing.Event;
+import org.wso2.eventing.EventFilter;
 import org.wso2.eventing.exceptions.EventException;
 
 import java.util.Calendar;
@@ -45,22 +46,34 @@
  */
 public class DefaultInMemorySubscriptionManager extends SynapseSubscriptionManager {
 
-    private final Map<String, SynapseSubscription> store =
-            new ConcurrentHashMap<String, SynapseSubscription>();
+    private final Map<String, Subscription> store =
+            new ConcurrentHashMap<String, Subscription>();
     private String topicHeaderName;
     private String topicHeaderNS;
     private SynapseXPath topicXPath;
     private static final Log log = LogFactory.getLog(DefaultInMemorySubscriptionManager.class);
 
-    public String addSubscription(SynapseSubscription subs) {
-        if (subs.getId() == null) {
-            subs.setId(org.apache.axiom.om.util.UUIDGenerator.getUUID());
+    public List<Subscription> getStaticSubscribers() {
+        LinkedList<Subscription> list = new LinkedList<Subscription>();
+        for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+            if ((stringSubscriptionEntry.getValue().getSubscriptionData().getProperty(
+                    SynapseEventingConstants.STATIC_ENTRY)).equals("true")) {
+                list.add(stringSubscriptionEntry.getValue());
+            }
+        }
+        return list;
+    }
+
+    public String subscribe(Subscription subscription) throws EventException {
+        if (subscription.getId() == null) {
+            subscription.setId(org.apache.axiom.om.util.UUIDGenerator.getUUID());
         }
-        store.put(subs.getId(), subs);
-        return subs.getId();
+        store.put(subscription.getId(), subscription);
+        return subscription.getId();
+
     }
 
-    public boolean deleteSubscription(String id) {
+    public boolean unsubscribe(String id) throws EventException {
         if (store.containsKey(id)) {
             store.remove(id);
             return true;
@@ -69,14 +82,9 @@
         }
     }
 
-    /**
-     * Renew the subscription by setting the expire date time
-     *
-     * @param subscription
-     * @return
-     */
-    public boolean renewSubscription(SynapseSubscription subscription) {
-        SynapseSubscription subscriptionOld = getSubscription(subscription.getId());
+
+    public boolean renew(Subscription subscription) throws EventException {
+        Subscription subscriptionOld = getSubscription(subscription.getId());
         if (subscriptionOld != null) {
             subscriptionOld.setExpires(subscription.getExpires());
             return true;
@@ -85,18 +93,26 @@
         }
     }
 
-    public List<SynapseSubscription> getSynapseSubscribers() {
-        LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
-        for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
+    public List<Subscription> getSubscriptions() throws EventException {
+        LinkedList<Subscription> list = new LinkedList<Subscription>();
+        for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
             list.add(stringSubscriptionEntry.getValue());
         }
         return list;
     }
 
-    public List<SynapseSubscription> getMatchingSubscribers(MessageContext mc) {
-        final LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
-        String evaluatedValue = null;
-        for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
+    public List<Subscription> getAllSubscriptions() throws EventException {
+        LinkedList<Subscription> list = new LinkedList<Subscription>();
+        for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+            list.add(stringSubscriptionEntry.getValue());
+        }
+        return list;
+    }
+
+    public List<Subscription> getMatchingSubscriptions(Event<MessageContext> event)
+            throws EventException {
+        final LinkedList<Subscription> list = new LinkedList<Subscription>();
+        for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
             //TODO : pick the filter based on the dialect
             //XPathBasedEventFilter filter = new XPathBasedEventFilter();
             TopicBasedEventFilter filter = new TopicBasedEventFilter();
@@ -105,9 +121,8 @@
                 filter.setSourceXpath(topicXPath);
                 //evaluatedValue = topicXPath.stringValueOf(mc);
             }
-            Event<MessageContext> event = new Event(mc);
             if (filter == null || filter.match(event)) {
-                SynapseSubscription subscription = stringSubscriptionEntry.getValue();
+                Subscription subscription = stringSubscriptionEntry.getValue();
                 Calendar current = Calendar.getInstance(); //Get current date and time
                 if (subscription.getExpires() != null) {
                     if (current.before(subscription.getExpires())) {
@@ -124,56 +139,7 @@
         return list;
     }
 
-    public List<SynapseSubscription> getStaticSubscribers() {
-        LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
-        for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
-            if ((stringSubscriptionEntry.getValue().getSubscriptionData().getProperty(
-                    SynapseEventingConstants.STATIC_ENTRY)).equals("true")) {
-                list.add(stringSubscriptionEntry.getValue());
-            }
-        }
-        return list;
-    }
-
-    @Deprecated
-    public String subscribe(Subscription subscription) throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public boolean unsubscribe(String s) throws EventException {
-        return false;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-
-    public String renew(Subscription subscription) throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public List<Subscription> getSubscriptions() throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public List<Subscription> getAllSubscriptions() throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public List<Subscription> getMatchingSubscriptions(String s) throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public List<Subscription> getSubscribers() throws EventException {
-        LinkedList<Subscription> list = new LinkedList<Subscription>();
-        for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
-            list.add(stringSubscriptionEntry.getValue());
-        }
-        return list;
-    }
-
-    public List<Subscription> getAllSubscribers() throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    public SynapseSubscription getSubscription(String id) {
+    public Subscription getSubscription(String id) {
         return store.get(id);
     }
 
@@ -181,9 +147,6 @@
         return null;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public Subscription getStatus(Subscription subscription) throws EventException {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
-    }
 
     public void init() {
         try {