You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2009/03/15 17:24:28 UTC
svn commit: r754685 -
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
Author: asanka
Date: Sun Mar 15 16:24:27 2009
New Revision: 754685
URL: http://svn.apache.org/viewvc?rev=754685&view=rev
Log:
Refactoring by removing the complex if conditions.
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java?rev=754685&r1=754684&r2=754685&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java Sun Mar 15 16:24:27 2009
@@ -45,7 +45,7 @@
import java.util.List;
/**
- * Eventsource that accepts the event requests using a message reciver.
+ * Eventsource that accepts the event requests using a message reciver.
*/
public class SynapseEventSource extends SynapseMessageReceiver {
@@ -105,147 +105,19 @@
.getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_ENV).getValue();
org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
ResponseMessageBuilder messageBuilder = new ResponseMessageBuilder(mc);
+
if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
// add new subscription to the SynapseSubscription store through subscription manager
- SynapseSubscription subscription = SubscriptionMessageBuilder.createSubscription(smc);
- if (log.isDebugEnabled()) {
- log.debug("SynapseSubscription request recived : " + subscription.getId());
- }
- if (subscription.getId() != null) {
- String subID = subscriptionManager.addSubscription(subscription);
- if (subID != null) {
- // Send the subscription responce
- if (log.isDebugEnabled()) {
- log.debug("Sending subscription response for SynapseSubscription ID : " +
- subscription.getId());
- }
- SOAPEnvelope soapEnvelope =
- messageBuilder.genSubscriptionResponse(subscription);
- dispatchResponse(soapEnvelope, EventingConstants.WSE_SUbSCRIBE_RESPONSE,
- mc, false);
- } else {
- // Send the Fault responce
- if (log.isDebugEnabled()) {
- log.debug("SynapseSubscription Failed, sending fault response");
- }
- SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
- EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
- "Unable to subscribe ", "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
- }
- } else {
- // Send the Fault responce
- if (log.isDebugEnabled()) {
- log.debug("SynapseSubscription Failed, sending fault response");
- }
- SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
- SubscriptionMessageBuilder.getErrorCode(),
- SubscriptionMessageBuilder.getErrorSubCode(),
- SubscriptionMessageBuilder.getErrorReason(), "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
- }
-
+ processSubscriptionRequest(smc, mc, messageBuilder);
} else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
- // Unsubscribe for responce
- SynapseSubscription subscription =
- SubscriptionMessageBuilder.createUnSubscribeMessage(smc);
- if (log.isDebugEnabled()) {
- log.debug("UnSubscribe response recived for SynapseSubscription ID : " +
- subscription.getId());
- }
- if (subscriptionManager.deleteSubscription(subscription.getId())) {
- //send the response
- if (log.isDebugEnabled()) {
- log.debug("Sending UnSubscribe responce for SynapseSubscription ID : " +
- subscription.getId());
- }
- SOAPEnvelope soapEnvelope = messageBuilder.genUnSubscribeResponse(subscription);
- RelatesTo relatesTo = new RelatesTo(subscription.getId());
- dispatchResponse(soapEnvelope, EventingConstants.WSE_UNSUBSCRIBE_RESPONSE,
- mc, false);
- } else {
- // Send the Fault responce
- if (log.isDebugEnabled()) {
- log.debug("UnSubscription failed, sending fault repsponse");
- }
- SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
- EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
- "Unable to Unsubscribe", "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
- }
+ // Unsubscribe the matching subscription
+ processUnSubscribeRequest(smc, mc, messageBuilder);
} else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
- // Get responce status
- SynapseSubscription subscription =
- SubscriptionMessageBuilder.createGetStatusMessage(smc);
- if (log.isDebugEnabled()) {
- log.debug("GetStatus request recived for SynapseSubscription ID : " +
- subscription.getId());
- }
- subscription = subscriptionManager.getSubscription(subscription.getId());
- if (subscription != null) {
- if (log.isDebugEnabled()) {
- log.debug("Sending GetStatus responce for SynapseSubscription ID : " +
- subscription.getId());
- }
- //send the responce
- SOAPEnvelope soapEnvelope = messageBuilder.genGetStatusResponse(subscription);
- RelatesTo relatesTo = new RelatesTo(subscription.getId());
- dispatchResponse(soapEnvelope, EventingConstants.WSE_GET_STATUS_RESPONSE,
- mc, false);
- } else {
- // Send the Fault responce
- if (log.isDebugEnabled()) {
- log.debug("GetStatus failed, sending fault response");
- }
- SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
- EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
- "Subscription Not Found", "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
- }
+ // Responce with the status of the subscription
+ processGetStatusRequest(smc, mc, messageBuilder);
} else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
// Renew subscription
- SynapseSubscription subscription =
- SubscriptionMessageBuilder.createRenewSubscribeMessage(smc);
- if (log.isDebugEnabled()) {
- log.debug("ReNew request recived for SynapseSubscription ID : " +
- subscription.getId());
- }
- String subID = subscription.getId();
- if (subID != null) {
- if (subscriptionManager.renewSubscription(subscription)) {
- //send the response
- if (log.isDebugEnabled()) {
- log.debug("Sending ReNew response for SynapseSubscription ID : " +
- subscription.getId());
- }
- SOAPEnvelope soapEnvelope =
- messageBuilder.genRenewSubscriptionResponse(subscription);
- RelatesTo relatesTo = new RelatesTo(subscription.getId());
- dispatchResponse(soapEnvelope, EventingConstants.WSE_RENEW_RESPONSE,
- mc, false);
- } else {
- // Send the Fault responce
- if (log.isDebugEnabled()) {
- log.debug("ReNew failed, sending fault response");
- }
- SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
- EventingConstants.WSE_FAULT_CODE_RECEIVER, "UnableToRenew",
- "Subscription Not Found", "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
- }
- } else {
- SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
- SubscriptionMessageBuilder.getErrorCode(),
- SubscriptionMessageBuilder.getErrorSubCode(),
- SubscriptionMessageBuilder.getErrorReason(), "");
- dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
- true);
- }
+ processReNewRequest(smc, mc, messageBuilder);
} else {
// Treat as an Event
if (log.isDebugEnabled()) {
@@ -274,23 +146,25 @@
rmc.setWSAAction(responseAction);
rmc.setSoapAction(responseAction);
rmc.setProperty(SynapseConstants.ISRESPONSE_PROPERTY, Boolean.TRUE);
- if(faultMessage){
+ if (faultMessage) {
AxisEngine.sendFault(rmc);
- }else{
+ } else {
AxisEngine.send(rmc);
}
}
/**
+ * Public method for event dispatching, used by the eventPublisher mediator and eventSource
*
* @param msgCtx message context
*/
- public void dispatchEvents(org.apache.synapse.MessageContext msgCtx){
+ public void dispatchEvents(org.apache.synapse.MessageContext msgCtx) {
List<SynapseSubscription> subscribers = subscriptionManager.getMatchingSubscribers(msgCtx);
// Call event dispatcher
msgCtx.getEnvironment().getExecutorService()
.execute(new EventDispatcher(msgCtx, subscribers));
}
+
/**
* Dispatching events async on a different thread
*/
@@ -298,14 +172,16 @@
org.apache.synapse.MessageContext synCtx;
List<SynapseSubscription> subscribers;
- EventDispatcher(org.apache.synapse.MessageContext synCtx, List<SynapseSubscription> subscribers) {
+ EventDispatcher(org.apache.synapse.MessageContext synCtx,
+ List<SynapseSubscription> subscribers) {
this.synCtx = synCtx;
this.subscribers = subscribers;
}
public void run() {
for (SynapseSubscription subscription : subscribers) {
- synCtx.setProperty(SynapseConstants.OUT_ONLY, "true"); // Set one way message for events
+ synCtx.setProperty(SynapseConstants.OUT_ONLY,
+ "true"); // Set one way message for events
try {
subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
} catch (AxisFault axisFault) {
@@ -317,4 +193,189 @@
}
}
}
+
+ /**
+ * Process the subscription message request
+ *
+ * @param smc synapse message context
+ * @param mc axis2 message context
+ * @param messageBuilder respose message builder
+ * @throws AxisFault
+ */
+ private void processSubscriptionRequest(org.apache.synapse.MessageContext smc,
+ MessageContext mc,
+ ResponseMessageBuilder messageBuilder)
+ throws AxisFault {
+ SynapseSubscription subscription = SubscriptionMessageBuilder.createSubscription(smc);
+ if (log.isDebugEnabled()) {
+ log.debug("SynapseSubscription request recived : " + subscription.getId());
+ }
+ if (subscription.getId() != null) {
+ String subID = subscriptionManager.addSubscription(subscription);
+ if (subID != null) {
+ // Send the subscription responce
+ if (log.isDebugEnabled()) {
+ log.debug("Sending subscription response for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ SOAPEnvelope soapEnvelope =
+ messageBuilder.genSubscriptionResponse(subscription);
+ dispatchResponse(soapEnvelope, EventingConstants.WSE_SUbSCRIBE_RESPONSE,
+ mc, false);
+ } else {
+ // Send the Fault responce
+ if (log.isDebugEnabled()) {
+ log.debug("SynapseSubscription Failed, sending fault response");
+ }
+ SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+ EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
+ "Unable to subscribe ", "");
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+ true);
+ }
+ } else {
+ // Send the Fault responce
+ if (log.isDebugEnabled()) {
+ log.debug("SynapseSubscription Failed, sending fault response");
+ }
+ SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+ SubscriptionMessageBuilder.getErrorCode(),
+ SubscriptionMessageBuilder.getErrorSubCode(),
+ SubscriptionMessageBuilder.getErrorReason(), "");
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+ true);
+ }
+ }
+
+ /**
+ * Process the UnSubscribe message request
+ *
+ * @param smc synapse message context
+ * @param mc axis2 message context
+ * @param messageBuilder respose message builder
+ * @throws AxisFault
+ */
+ private void processUnSubscribeRequest(org.apache.synapse.MessageContext smc,
+ MessageContext mc,
+ ResponseMessageBuilder messageBuilder) throws AxisFault {
+ SynapseSubscription subscription =
+ SubscriptionMessageBuilder.createUnSubscribeMessage(smc);
+ if (log.isDebugEnabled()) {
+ log.debug("UnSubscribe response recived for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ if (subscriptionManager.deleteSubscription(subscription.getId())) {
+ //send the response
+ if (log.isDebugEnabled()) {
+ log.debug("Sending UnSubscribe responce for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ SOAPEnvelope soapEnvelope = messageBuilder.genUnSubscribeResponse(subscription);
+ RelatesTo relatesTo = new RelatesTo(subscription.getId());
+ dispatchResponse(soapEnvelope, EventingConstants.WSE_UNSUBSCRIBE_RESPONSE,
+ mc, false);
+ } else {
+ // Send the Fault responce
+ if (log.isDebugEnabled()) {
+ log.debug("UnSubscription failed, sending fault repsponse");
+ }
+ SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+ EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
+ "Unable to Unsubscribe", "");
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+ true);
+ }
+ }
+
+ /**
+ * Process the GetStatus message request
+ *
+ * @param smc synapse message context
+ * @param mc axis2 message context
+ * @param messageBuilder respose message builder
+ * @throws AxisFault
+ */
+ private void processGetStatusRequest(org.apache.synapse.MessageContext smc,
+ MessageContext mc,
+ ResponseMessageBuilder messageBuilder) throws AxisFault {
+ SynapseSubscription subscription =
+ SubscriptionMessageBuilder.createGetStatusMessage(smc);
+ if (log.isDebugEnabled()) {
+ log.debug("GetStatus request recived for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ subscription = subscriptionManager.getSubscription(subscription.getId());
+ if (subscription != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Sending GetStatus responce for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ //send the responce
+ SOAPEnvelope soapEnvelope = messageBuilder.genGetStatusResponse(subscription);
+ RelatesTo relatesTo = new RelatesTo(subscription.getId());
+ dispatchResponse(soapEnvelope, EventingConstants.WSE_GET_STATUS_RESPONSE,
+ mc, false);
+ } else {
+ // Send the Fault responce
+ if (log.isDebugEnabled()) {
+ log.debug("GetStatus failed, sending fault response");
+ }
+ SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+ EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
+ "Subscription Not Found", "");
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+ true);
+ }
+ }
+
+ /**
+ * Process the ReNew message request
+ *
+ * @param smc synapse message context
+ * @param mc axis2 message context
+ * @param messageBuilder respose message builder
+ * @throws AxisFault
+ */
+ private void processReNewRequest(org.apache.synapse.MessageContext smc,
+ MessageContext mc,
+ ResponseMessageBuilder messageBuilder) throws AxisFault {
+ SynapseSubscription subscription =
+ SubscriptionMessageBuilder.createRenewSubscribeMessage(smc);
+ if (log.isDebugEnabled()) {
+ log.debug("ReNew request recived for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ String subID = subscription.getId();
+ if (subID != null) {
+ if (subscriptionManager.renewSubscription(subscription)) {
+ //send the response
+ if (log.isDebugEnabled()) {
+ log.debug("Sending ReNew response for SynapseSubscription ID : " +
+ subscription.getId());
+ }
+ SOAPEnvelope soapEnvelope =
+ messageBuilder.genRenewSubscriptionResponse(subscription);
+ RelatesTo relatesTo = new RelatesTo(subscription.getId());
+ dispatchResponse(soapEnvelope, EventingConstants.WSE_RENEW_RESPONSE,
+ mc, false);
+ } else {
+ // Send the Fault responce
+ if (log.isDebugEnabled()) {
+ log.debug("ReNew failed, sending fault response");
+ }
+ SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+ EventingConstants.WSE_FAULT_CODE_RECEIVER, "UnableToRenew",
+ "Subscription Not Found", "");
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+ true);
+ }
+ } else {
+ SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+ SubscriptionMessageBuilder.getErrorCode(),
+ SubscriptionMessageBuilder.getErrorSubCode(),
+ SubscriptionMessageBuilder.getErrorReason(), "");
+ dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+ true);
+ }
+ }
}