You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2009/03/30 14:28:30 UTC
svn commit: r759941 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse:
config/xml/eventing/ eventing/ eventing/managers/
Author: asanka
Date: Mon Mar 30 12:28:30 2009
New Revision: 759941
URL: http://svn.apache.org/viewvc?rev=759941&view=rev
Log:
Removed the duplicated methods from the SynapseSubscription manager, matching subscriptions operation takes Event<T> as an input param.
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceFactory.java Mon Mar 30 12:28:30 2009
@@ -35,6 +35,7 @@
import org.apache.synapse.eventing.SynapseSubscriptionManager;
import org.apache.synapse.eventing.filters.XPathBasedEventFilter;
import org.wso2.eventing.SubscriptionData;
+import org.wso2.eventing.exceptions.EventException;
import javax.xml.namespace.QName;
import java.util.Iterator;
@@ -125,7 +126,13 @@
handleException(
"SynapseSubscription Manager has not been specified for the event source");
}
- createStaticSubscriptions(elem, eventSource);
+
+ try {
+ createStaticSubscriptions(elem, eventSource);
+ } catch (EventException e) {
+ handleException("Static subscription creation failure",e);
+ }
+
return eventSource;
}
@@ -146,7 +153,8 @@
* @param synapseEventSource
*/
private static void createStaticSubscriptions(OMElement elem,
- SynapseEventSource synapseEventSource) {
+ SynapseEventSource synapseEventSource)
+ throws EventException {
for (Iterator iterator = elem.getChildrenWithName(SUBSCRIPTION_QNAME);
iterator.hasNext();) {
SynapseSubscription synapseSubscription = new SynapseSubscription();
@@ -206,7 +214,7 @@
}
synapseSubscription.getSubscriptionData()
.setProperty(SynapseEventingConstants.STATIC_ENTRY, "true");
- synapseEventSource.getSubscriptionManager().addSubscription(synapseSubscription);
+ synapseEventSource.getSubscriptionManager().subscribe(synapseSubscription);
}
}
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/eventing/EventSourceSerializer.java Mon Mar 30 12:28:30 2009
@@ -28,6 +28,7 @@
import org.apache.synapse.eventing.SynapseEventSource;
import org.apache.synapse.eventing.SynapseEventingConstants;
import org.apache.synapse.eventing.SynapseSubscription;
+import org.wso2.eventing.Subscription;
import java.util.Iterator;
import java.util.List;
@@ -74,11 +75,11 @@
}
evenSourceElem.addChild(subManagerElem);
// Adding static subscriptions
- List<SynapseSubscription> staticSubscriptionList =
+ List<Subscription> staticSubscriptionList =
eventSource.getSubscriptionManager().getStaticSubscribers();
- for (Iterator<SynapseSubscription> iterator = staticSubscriptionList.iterator();
+ for (Iterator<Subscription> iterator = staticSubscriptionList.iterator();
iterator.hasNext();) {
- SynapseSubscription staticSubscription = iterator.next();
+ Subscription staticSubscription = iterator.next();
OMElement staticSubElem =
fac.createOMElement("subscription", XMLConfigConstants.SYNAPSE_OMNAMESPACE);
staticSubElem.addAttribute(
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java Mon Mar 30 12:28:30 2009
@@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.endpoints.AddressEndpoint;
import org.apache.synapse.endpoints.EndpointDefinition;
@@ -43,6 +44,9 @@
import org.apache.synapse.eventing.builders.SubscriptionMessageBuilder;
import org.apache.synapse.util.MessageHelper;
import org.wso2.eventing.EventingConstants;
+import org.wso2.eventing.Subscription;
+import org.wso2.eventing.Event;
+import org.wso2.eventing.exceptions.EventException;
import javax.xml.namespace.QName;
import java.util.List;
@@ -110,25 +114,28 @@
org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
// initialisze the response message builder using the message context
ResponseMessageBuilder messageBuilder = new ResponseMessageBuilder(mc);
-
- if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
- // add new subscription to the SynapseSubscription store through subscription manager
- processSubscriptionRequest(mc, messageBuilder);
- } else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
- // Unsubscribe the matching subscription
- processUnSubscribeRequest(mc, messageBuilder);
- } else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
- // Responce with the status of the subscription
- processGetStatusRequest(mc, messageBuilder);
- } else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
- // Renew subscription
- processReNewRequest(mc, messageBuilder);
- } else {
- // Treat as an Event
- if (log.isDebugEnabled()) {
- log.debug("Event recived");
+ try {
+ if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
+ // add new subscription to the SynapseSubscription store through subscription manager
+ processSubscriptionRequest(mc, messageBuilder);
+ } else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
+ // Unsubscribe the matching subscription
+ processUnSubscribeRequest(mc, messageBuilder);
+ } else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
+ // Responce with the status of the subscription
+ processGetStatusRequest(mc, messageBuilder);
+ } else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
+ // Renew subscription
+ processReNewRequest(mc, messageBuilder);
+ } else {
+ // Treat as an Event
+ if (log.isDebugEnabled()) {
+ log.debug("Event recived");
+ }
+ dispatchEvents(smc);
}
- dispatchEvents(smc);
+ } catch (EventException e) {
+ handleException("Subscription manager processing error",e);
}
}
@@ -164,8 +171,15 @@
* @param msgCtx message context
*/
public void dispatchEvents(org.apache.synapse.MessageContext msgCtx) {
+ Event<org.apache.synapse.MessageContext> event = new Event(msgCtx);
+
+ List<Subscription> subscribers = null;
+ try {
+ subscribers = subscriptionManager.getMatchingSubscriptions(event);
+ } catch (EventException e) {
+ handleException("Matching subscriptions fetching error",e);
+ }
- List<SynapseSubscription> subscribers = subscriptionManager.getMatchingSubscribers(msgCtx);
// Call event dispatcher
msgCtx.getEnvironment().getExecutorService()
.execute(new EventDispatcher(msgCtx, subscribers));
@@ -176,16 +190,16 @@
*/
class EventDispatcher implements Runnable {
private org.apache.synapse.MessageContext synCtx;
- private List<SynapseSubscription> subscribers;
+ private List<Subscription> subscribers;
EventDispatcher(org.apache.synapse.MessageContext synCtx,
- List<SynapseSubscription> subscribers) {
+ List<Subscription> subscribers) {
this.synCtx = synCtx;
this.subscribers = subscribers;
}
public void run() {
- for (SynapseSubscription subscription : subscribers) {
+ for (Subscription subscription : subscribers) {
synCtx.setProperty(SynapseConstants.OUT_ONLY,
"true"); // Set one way message for events
try {
@@ -210,13 +224,13 @@
*/
private void processSubscriptionRequest(MessageContext mc,
ResponseMessageBuilder messageBuilder)
- throws AxisFault {
+ throws AxisFault, EventException {
SynapseSubscription subscription = SubscriptionMessageBuilder.createSubscription(mc);
if (log.isDebugEnabled()) {
log.debug("SynapseSubscription request recived : " + subscription.getId());
}
if (subscription.getId() != null) {
- String subID = subscriptionManager.addSubscription(subscription);
+ String subID = subscriptionManager.subscribe(subscription);
if (subID != null) {
// Send the subscription responce
if (log.isDebugEnabled()) {
@@ -260,14 +274,15 @@
* @throws AxisFault
*/
private void processUnSubscribeRequest(MessageContext mc,
- ResponseMessageBuilder messageBuilder) throws AxisFault {
+ ResponseMessageBuilder messageBuilder)
+ throws AxisFault, EventException {
SynapseSubscription subscription =
SubscriptionMessageBuilder.createUnSubscribeMessage(mc);
if (log.isDebugEnabled()) {
log.debug("UnSubscribe response recived for SynapseSubscription ID : " +
subscription.getId());
}
- if (subscriptionManager.deleteSubscription(subscription.getId())) {
+ if (subscriptionManager.unsubscribe(subscription.getId())) {
//send the response
if (log.isDebugEnabled()) {
log.debug("Sending UnSubscribe responce for SynapseSubscription ID : " +
@@ -297,14 +312,16 @@
* @throws AxisFault
*/
private void processGetStatusRequest(MessageContext mc,
- ResponseMessageBuilder messageBuilder) throws AxisFault {
+ ResponseMessageBuilder messageBuilder)
+ throws AxisFault, EventException {
SynapseSubscription subscription =
SubscriptionMessageBuilder.createGetStatusMessage(mc);
if (log.isDebugEnabled()) {
log.debug("GetStatus request recived for SynapseSubscription ID : " +
subscription.getId());
}
- subscription = subscriptionManager.getSubscription(subscription.getId());
+ subscription =
+ (SynapseSubscription) subscriptionManager.getSubscription(subscription.getId());
if (subscription != null) {
if (log.isDebugEnabled()) {
log.debug("Sending GetStatus responce for SynapseSubscription ID : " +
@@ -335,7 +352,8 @@
* @throws AxisFault
*/
private void processReNewRequest(MessageContext mc,
- ResponseMessageBuilder messageBuilder) throws AxisFault {
+ ResponseMessageBuilder messageBuilder)
+ throws AxisFault, EventException {
SynapseSubscription subscription =
SubscriptionMessageBuilder.createRenewSubscribeMessage(mc);
if (log.isDebugEnabled()) {
@@ -344,7 +362,7 @@
}
String subID = subscription.getId();
if (subID != null) {
- if (subscriptionManager.renewSubscription(subscription)) {
+ if (subscriptionManager.renew(subscription)) {
//send the response
if (log.isDebugEnabled()) {
log.debug("Sending ReNew response for SynapseSubscription ID : " +
@@ -387,4 +405,9 @@
endpoint.setDefinition(def);
return endpoint;
}
+
+ private void handleException(String message, Exception e) {
+ log.error(message, e);
+ throw new SynapseException(message, e);
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseSubscriptionManager.java Mon Mar 30 12:28:30 2009
@@ -21,6 +21,7 @@
import org.apache.synapse.MessageContext;
import org.wso2.eventing.SubscriptionManager;
+import org.wso2.eventing.Subscription;
import java.util.Collection;
import java.util.HashMap;
@@ -30,65 +31,17 @@
/**
* Subscription Manager for Synapse
*/
-public abstract class SynapseSubscriptionManager implements SubscriptionManager {
+public abstract class SynapseSubscriptionManager implements SubscriptionManager<MessageContext> {
private final Map<String, String> properties = new HashMap<String, String>();
/**
- * Return all Active subscriptions
- *
- * @return List of subscriptions
- */
- public abstract List<SynapseSubscription> getSynapseSubscribers();
-
- /**
- * Get the matching subscriptions for a given filter.
- *
- * @param mc Message context
- * @return List of subscriptions
- */
- public abstract List<SynapseSubscription> getMatchingSubscribers(MessageContext mc);
-
- /**
* Get the static subscription defined in the configuration
*
* @return List of static subscriptions
*/
- public abstract List<SynapseSubscription> getStaticSubscribers();
+ public abstract List<Subscription> getStaticSubscribers();
- /**
- * Get a subscription by subscription ID
- *
- * @param id subscription ID
- * @return SynapseSubscription
- */
- public abstract SynapseSubscription getSubscription(String id);
-
- /**
- * Add a new subscription to the store
- *
- * @param subs Subscription object
- * @return String subscription ID
- */
- public abstract String addSubscription(SynapseSubscription subs);
-
- /**
- * Delete a given subscription
- *
- * @param id Subscription ID
- * @return True|False
- */
- public abstract boolean deleteSubscription(String id);
-
- /**
- * Renew a given subscription
- *
- * @param subscription subscription object
- * @return True|False
- */
- public abstract boolean renewSubscription(SynapseSubscription subscription);
-
- public abstract void init();
public void addProperty(String name, String value) {
properties.put(name, value);
@@ -100,5 +53,5 @@
public String getPropertyValue(String name) {
return properties.get(name);
- }
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java?rev=759941&r1=759940&r2=759941&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/managers/DefaultInMemorySubscriptionManager.java Mon Mar 30 12:28:30 2009
@@ -32,6 +32,7 @@
import org.jaxen.JaxenException;
import org.wso2.eventing.Subscription;
import org.wso2.eventing.Event;
+import org.wso2.eventing.EventFilter;
import org.wso2.eventing.exceptions.EventException;
import java.util.Calendar;
@@ -45,22 +46,34 @@
*/
public class DefaultInMemorySubscriptionManager extends SynapseSubscriptionManager {
- private final Map<String, SynapseSubscription> store =
- new ConcurrentHashMap<String, SynapseSubscription>();
+ private final Map<String, Subscription> store =
+ new ConcurrentHashMap<String, Subscription>();
private String topicHeaderName;
private String topicHeaderNS;
private SynapseXPath topicXPath;
private static final Log log = LogFactory.getLog(DefaultInMemorySubscriptionManager.class);
- public String addSubscription(SynapseSubscription subs) {
- if (subs.getId() == null) {
- subs.setId(org.apache.axiom.om.util.UUIDGenerator.getUUID());
+ public List<Subscription> getStaticSubscribers() {
+ LinkedList<Subscription> list = new LinkedList<Subscription>();
+ for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+ if ((stringSubscriptionEntry.getValue().getSubscriptionData().getProperty(
+ SynapseEventingConstants.STATIC_ENTRY)).equals("true")) {
+ list.add(stringSubscriptionEntry.getValue());
+ }
+ }
+ return list;
+ }
+
+ public String subscribe(Subscription subscription) throws EventException {
+ if (subscription.getId() == null) {
+ subscription.setId(org.apache.axiom.om.util.UUIDGenerator.getUUID());
}
- store.put(subs.getId(), subs);
- return subs.getId();
+ store.put(subscription.getId(), subscription);
+ return subscription.getId();
+
}
- public boolean deleteSubscription(String id) {
+ public boolean unsubscribe(String id) throws EventException {
if (store.containsKey(id)) {
store.remove(id);
return true;
@@ -69,14 +82,9 @@
}
}
- /**
- * Renew the subscription by setting the expire date time
- *
- * @param subscription
- * @return
- */
- public boolean renewSubscription(SynapseSubscription subscription) {
- SynapseSubscription subscriptionOld = getSubscription(subscription.getId());
+
+ public boolean renew(Subscription subscription) throws EventException {
+ Subscription subscriptionOld = getSubscription(subscription.getId());
if (subscriptionOld != null) {
subscriptionOld.setExpires(subscription.getExpires());
return true;
@@ -85,18 +93,26 @@
}
}
- public List<SynapseSubscription> getSynapseSubscribers() {
- LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
- for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
+ public List<Subscription> getSubscriptions() throws EventException {
+ LinkedList<Subscription> list = new LinkedList<Subscription>();
+ for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
list.add(stringSubscriptionEntry.getValue());
}
return list;
}
- public List<SynapseSubscription> getMatchingSubscribers(MessageContext mc) {
- final LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
- String evaluatedValue = null;
- for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
+ public List<Subscription> getAllSubscriptions() throws EventException {
+ LinkedList<Subscription> list = new LinkedList<Subscription>();
+ for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
+ list.add(stringSubscriptionEntry.getValue());
+ }
+ return list;
+ }
+
+ public List<Subscription> getMatchingSubscriptions(Event<MessageContext> event)
+ throws EventException {
+ final LinkedList<Subscription> list = new LinkedList<Subscription>();
+ for (Map.Entry<String, Subscription> stringSubscriptionEntry : store.entrySet()) {
//TODO : pick the filter based on the dialect
//XPathBasedEventFilter filter = new XPathBasedEventFilter();
TopicBasedEventFilter filter = new TopicBasedEventFilter();
@@ -105,9 +121,8 @@
filter.setSourceXpath(topicXPath);
//evaluatedValue = topicXPath.stringValueOf(mc);
}
- Event<MessageContext> event = new Event(mc);
if (filter == null || filter.match(event)) {
- SynapseSubscription subscription = stringSubscriptionEntry.getValue();
+ Subscription subscription = stringSubscriptionEntry.getValue();
Calendar current = Calendar.getInstance(); //Get current date and time
if (subscription.getExpires() != null) {
if (current.before(subscription.getExpires())) {
@@ -124,56 +139,7 @@
return list;
}
- public List<SynapseSubscription> getStaticSubscribers() {
- LinkedList<SynapseSubscription> list = new LinkedList<SynapseSubscription>();
- for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
- if ((stringSubscriptionEntry.getValue().getSubscriptionData().getProperty(
- SynapseEventingConstants.STATIC_ENTRY)).equals("true")) {
- list.add(stringSubscriptionEntry.getValue());
- }
- }
- return list;
- }
-
- @Deprecated
- public String subscribe(Subscription subscription) throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public boolean unsubscribe(String s) throws EventException {
- return false; //To change body of implemented methods use File | Settings | File Templates.
- }
-
-
- public String renew(Subscription subscription) throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public List<Subscription> getSubscriptions() throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public List<Subscription> getAllSubscriptions() throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public List<Subscription> getMatchingSubscriptions(String s) throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public List<Subscription> getSubscribers() throws EventException {
- LinkedList<Subscription> list = new LinkedList<Subscription>();
- for (Map.Entry<String, SynapseSubscription> stringSubscriptionEntry : store.entrySet()) {
- list.add(stringSubscriptionEntry.getValue());
- }
- return list;
- }
-
- public List<Subscription> getAllSubscribers() throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
-
- public SynapseSubscription getSubscription(String id) {
+ public Subscription getSubscription(String id) {
return store.get(id);
}
@@ -181,9 +147,6 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public Subscription getStatus(Subscription subscription) throws EventException {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
public void init() {
try {