You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2009/03/15 17:24:28 UTC

svn commit: r754685 - /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java

Author: asanka
Date: Sun Mar 15 16:24:27 2009
New Revision: 754685

URL: http://svn.apache.org/viewvc?rev=754685&view=rev
Log:
Refactoring by removing the complex if conditions.

Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java?rev=754685&r1=754684&r2=754685&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/eventing/SynapseEventSource.java Sun Mar 15 16:24:27 2009
@@ -45,7 +45,7 @@
 import java.util.List;
 
 /**
- *  Eventsource that accepts the event requests using a message reciver. 
+ * Eventsource that accepts the event requests using a message reciver.
  */
 public class SynapseEventSource extends SynapseMessageReceiver {
 
@@ -105,147 +105,19 @@
                 .getAxisConfiguration().getParameter(SynapseConstants.SYNAPSE_ENV).getValue();
         org.apache.synapse.MessageContext smc = new Axis2MessageContext(mc, synCfg, synEnv);
         ResponseMessageBuilder messageBuilder = new ResponseMessageBuilder(mc);
+
         if (EventingConstants.WSE_SUBSCRIBE.equals(mc.getWSAAction())) {
             // add new subscription to the SynapseSubscription store through subscription manager
-            SynapseSubscription subscription = SubscriptionMessageBuilder.createSubscription(smc);
-            if (log.isDebugEnabled()) {
-                log.debug("SynapseSubscription request recived  : " + subscription.getId());
-            }
-            if (subscription.getId() != null) {
-                String subID = subscriptionManager.addSubscription(subscription);
-                if (subID != null) {
-                    // Send the subscription responce
-                    if (log.isDebugEnabled()) {
-                        log.debug("Sending subscription response for SynapseSubscription ID : " +
-                                subscription.getId());
-                    }
-                    SOAPEnvelope soapEnvelope =
-                            messageBuilder.genSubscriptionResponse(subscription);
-                    dispatchResponse(soapEnvelope, EventingConstants.WSE_SUbSCRIBE_RESPONSE,
-                            mc, false);
-                } else {
-                    // Send the Fault responce
-                    if (log.isDebugEnabled()) {
-                        log.debug("SynapseSubscription Failed, sending fault response");
-                    }
-                    SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
-                            EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
-                            "Unable to subscribe ", "");
-                    dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                            true);
-                }
-            } else {
-                // Send the Fault responce
-                if (log.isDebugEnabled()) {
-                    log.debug("SynapseSubscription Failed, sending fault response");
-                }
-                SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
-                        SubscriptionMessageBuilder.getErrorCode(),
-                        SubscriptionMessageBuilder.getErrorSubCode(),
-                        SubscriptionMessageBuilder.getErrorReason(), "");
-                dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                        true);
-            }
-
+            processSubscriptionRequest(smc, mc, messageBuilder);
         } else if (EventingConstants.WSE_UNSUBSCRIBE.equals(mc.getWSAAction())) {
-            // Unsubscribe for responce
-            SynapseSubscription subscription =
-                    SubscriptionMessageBuilder.createUnSubscribeMessage(smc);
-            if (log.isDebugEnabled()) {
-                log.debug("UnSubscribe response recived for SynapseSubscription ID : " +
-                        subscription.getId());
-            }
-            if (subscriptionManager.deleteSubscription(subscription.getId())) {
-                //send the response
-                if (log.isDebugEnabled()) {
-                    log.debug("Sending UnSubscribe responce for SynapseSubscription ID : " +
-                            subscription.getId());
-                }
-                SOAPEnvelope soapEnvelope = messageBuilder.genUnSubscribeResponse(subscription);
-                RelatesTo relatesTo = new RelatesTo(subscription.getId());
-                dispatchResponse(soapEnvelope, EventingConstants.WSE_UNSUBSCRIBE_RESPONSE,
-                        mc, false);
-            } else {
-                // Send the Fault responce
-                if (log.isDebugEnabled()) {
-                    log.debug("UnSubscription failed, sending fault repsponse");
-                }
-                SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
-                        EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
-                        "Unable to Unsubscribe", "");
-                dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                        true);
-            }
+            // Unsubscribe the matching subscription
+            processUnSubscribeRequest(smc, mc, messageBuilder);
         } else if (EventingConstants.WSE_GET_STATUS.equals(mc.getWSAAction())) {
-            // Get responce status
-            SynapseSubscription subscription =
-                    SubscriptionMessageBuilder.createGetStatusMessage(smc);
-            if (log.isDebugEnabled()) {
-                log.debug("GetStatus request recived for SynapseSubscription ID : " +
-                        subscription.getId());
-            }
-            subscription = subscriptionManager.getSubscription(subscription.getId());
-            if (subscription != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Sending GetStatus responce for SynapseSubscription ID : " +
-                            subscription.getId());
-                }
-                //send the responce
-                SOAPEnvelope soapEnvelope = messageBuilder.genGetStatusResponse(subscription);
-                RelatesTo relatesTo = new RelatesTo(subscription.getId());
-                dispatchResponse(soapEnvelope, EventingConstants.WSE_GET_STATUS_RESPONSE,
-                        mc, false);
-            } else {
-                // Send the Fault responce
-                if (log.isDebugEnabled()) {
-                    log.debug("GetStatus failed, sending fault response");
-                }
-                SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
-                        EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
-                        "Subscription Not Found", "");
-                dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                        true);
-            }
+            // Responce with the status of the subscription
+            processGetStatusRequest(smc, mc, messageBuilder);
         } else if (EventingConstants.WSE_RENEW.equals(mc.getWSAAction())) {
             // Renew subscription
-            SynapseSubscription subscription =
-                    SubscriptionMessageBuilder.createRenewSubscribeMessage(smc);
-            if (log.isDebugEnabled()) {
-                log.debug("ReNew request recived for SynapseSubscription ID : " +
-                        subscription.getId());
-            }
-            String subID = subscription.getId();
-            if (subID != null) {
-                if (subscriptionManager.renewSubscription(subscription)) {
-                    //send the response
-                    if (log.isDebugEnabled()) {
-                        log.debug("Sending ReNew response for SynapseSubscription ID : " +
-                                subscription.getId());
-                    }
-                    SOAPEnvelope soapEnvelope =
-                            messageBuilder.genRenewSubscriptionResponse(subscription);
-                    RelatesTo relatesTo = new RelatesTo(subscription.getId());
-                    dispatchResponse(soapEnvelope, EventingConstants.WSE_RENEW_RESPONSE,
-                            mc, false);
-                } else {
-                    // Send the Fault responce
-                    if (log.isDebugEnabled()) {
-                        log.debug("ReNew failed, sending fault response");
-                    }
-                    SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
-                            EventingConstants.WSE_FAULT_CODE_RECEIVER, "UnableToRenew",
-                            "Subscription Not Found", "");
-                    dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                            true);
-                }
-            } else {
-                SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
-                        SubscriptionMessageBuilder.getErrorCode(),
-                        SubscriptionMessageBuilder.getErrorSubCode(),
-                        SubscriptionMessageBuilder.getErrorReason(), "");
-                dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
-                        true);
-            }
+            processReNewRequest(smc, mc, messageBuilder);
         } else {
             // Treat as an Event
             if (log.isDebugEnabled()) {
@@ -274,23 +146,25 @@
         rmc.setWSAAction(responseAction);
         rmc.setSoapAction(responseAction);
         rmc.setProperty(SynapseConstants.ISRESPONSE_PROPERTY, Boolean.TRUE);
-        if(faultMessage){
+        if (faultMessage) {
             AxisEngine.sendFault(rmc);
-        }else{
+        } else {
             AxisEngine.send(rmc);
         }
     }
 
     /**
+     * Public method for event dispatching, used by the eventPublisher mediator and eventSource
      *
      * @param msgCtx message context
      */
-    public void dispatchEvents(org.apache.synapse.MessageContext msgCtx){
+    public void dispatchEvents(org.apache.synapse.MessageContext msgCtx) {
         List<SynapseSubscription> subscribers = subscriptionManager.getMatchingSubscribers(msgCtx);
         // Call event dispatcher
         msgCtx.getEnvironment().getExecutorService()
                 .execute(new EventDispatcher(msgCtx, subscribers));
     }
+
     /**
      * Dispatching events async on a different thread
      */
@@ -298,14 +172,16 @@
         org.apache.synapse.MessageContext synCtx;
         List<SynapseSubscription> subscribers;
 
-        EventDispatcher(org.apache.synapse.MessageContext synCtx, List<SynapseSubscription> subscribers) {
+        EventDispatcher(org.apache.synapse.MessageContext synCtx,
+                        List<SynapseSubscription> subscribers) {
             this.synCtx = synCtx;
             this.subscribers = subscribers;
         }
 
         public void run() {
             for (SynapseSubscription subscription : subscribers) {
-                synCtx.setProperty(SynapseConstants.OUT_ONLY, "true");    // Set one way message for events
+                synCtx.setProperty(SynapseConstants.OUT_ONLY,
+                        "true");    // Set one way message for events
                 try {
                     subscription.getEndpoint().send(MessageHelper.cloneMessageContext(synCtx));
                 } catch (AxisFault axisFault) {
@@ -317,4 +193,189 @@
             }
         }
     }
+
+    /**
+     * Process the subscription message request
+     *
+     * @param smc            synapse message context
+     * @param mc             axis2 message context
+     * @param messageBuilder respose message builder
+     * @throws AxisFault
+     */
+    private void processSubscriptionRequest(org.apache.synapse.MessageContext smc,
+                                            MessageContext mc,
+                                            ResponseMessageBuilder messageBuilder)
+            throws AxisFault {
+        SynapseSubscription subscription = SubscriptionMessageBuilder.createSubscription(smc);
+        if (log.isDebugEnabled()) {
+            log.debug("SynapseSubscription request recived  : " + subscription.getId());
+        }
+        if (subscription.getId() != null) {
+            String subID = subscriptionManager.addSubscription(subscription);
+            if (subID != null) {
+                // Send the subscription responce
+                if (log.isDebugEnabled()) {
+                    log.debug("Sending subscription response for SynapseSubscription ID : " +
+                            subscription.getId());
+                }
+                SOAPEnvelope soapEnvelope =
+                        messageBuilder.genSubscriptionResponse(subscription);
+                dispatchResponse(soapEnvelope, EventingConstants.WSE_SUbSCRIBE_RESPONSE,
+                        mc, false);
+            } else {
+                // Send the Fault responce
+                if (log.isDebugEnabled()) {
+                    log.debug("SynapseSubscription Failed, sending fault response");
+                }
+                SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+                        EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
+                        "Unable to subscribe ", "");
+                dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+                        true);
+            }
+        } else {
+            // Send the Fault responce
+            if (log.isDebugEnabled()) {
+                log.debug("SynapseSubscription Failed, sending fault response");
+            }
+            SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+                    SubscriptionMessageBuilder.getErrorCode(),
+                    SubscriptionMessageBuilder.getErrorSubCode(),
+                    SubscriptionMessageBuilder.getErrorReason(), "");
+            dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+                    true);
+        }
+    }
+
+    /**
+     * Process the UnSubscribe message request
+     *
+     * @param smc            synapse message context
+     * @param mc             axis2 message context
+     * @param messageBuilder respose message builder
+     * @throws AxisFault
+     */
+    private void processUnSubscribeRequest(org.apache.synapse.MessageContext smc,
+                                           MessageContext mc,
+                                           ResponseMessageBuilder messageBuilder) throws AxisFault {
+        SynapseSubscription subscription =
+                SubscriptionMessageBuilder.createUnSubscribeMessage(smc);
+        if (log.isDebugEnabled()) {
+            log.debug("UnSubscribe response recived for SynapseSubscription ID : " +
+                    subscription.getId());
+        }
+        if (subscriptionManager.deleteSubscription(subscription.getId())) {
+            //send the response
+            if (log.isDebugEnabled()) {
+                log.debug("Sending UnSubscribe responce for SynapseSubscription ID : " +
+                        subscription.getId());
+            }
+            SOAPEnvelope soapEnvelope = messageBuilder.genUnSubscribeResponse(subscription);
+            RelatesTo relatesTo = new RelatesTo(subscription.getId());
+            dispatchResponse(soapEnvelope, EventingConstants.WSE_UNSUBSCRIBE_RESPONSE,
+                    mc, false);
+        } else {
+            // Send the Fault responce
+            if (log.isDebugEnabled()) {
+                log.debug("UnSubscription failed, sending fault repsponse");
+            }
+            SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+                    EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
+                    "Unable to Unsubscribe", "");
+            dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+                    true);
+        }
+    }
+
+    /**
+     * Process the GetStatus message request
+     *
+     * @param smc            synapse message context
+     * @param mc             axis2 message context
+     * @param messageBuilder respose message builder
+     * @throws AxisFault
+     */
+    private void processGetStatusRequest(org.apache.synapse.MessageContext smc,
+                                         MessageContext mc,
+                                         ResponseMessageBuilder messageBuilder) throws AxisFault {
+        SynapseSubscription subscription =
+                SubscriptionMessageBuilder.createGetStatusMessage(smc);
+        if (log.isDebugEnabled()) {
+            log.debug("GetStatus request recived for SynapseSubscription ID : " +
+                    subscription.getId());
+        }
+        subscription = subscriptionManager.getSubscription(subscription.getId());
+        if (subscription != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Sending GetStatus responce for SynapseSubscription ID : " +
+                        subscription.getId());
+            }
+            //send the responce
+            SOAPEnvelope soapEnvelope = messageBuilder.genGetStatusResponse(subscription);
+            RelatesTo relatesTo = new RelatesTo(subscription.getId());
+            dispatchResponse(soapEnvelope, EventingConstants.WSE_GET_STATUS_RESPONSE,
+                    mc, false);
+        } else {
+            // Send the Fault responce
+            if (log.isDebugEnabled()) {
+                log.debug("GetStatus failed, sending fault response");
+            }
+            SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+                    EventingConstants.WSE_FAULT_CODE_RECEIVER, "EventSourceUnableToProcess",
+                    "Subscription Not Found", "");
+            dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+                    true);
+        }
+    }
+
+    /**
+     * Process the ReNew message request
+     *
+     * @param smc            synapse message context
+     * @param mc             axis2 message context
+     * @param messageBuilder respose message builder
+     * @throws AxisFault
+     */
+    private void processReNewRequest(org.apache.synapse.MessageContext smc,
+                                     MessageContext mc,
+                                     ResponseMessageBuilder messageBuilder) throws AxisFault {
+        SynapseSubscription subscription =
+                SubscriptionMessageBuilder.createRenewSubscribeMessage(smc);
+        if (log.isDebugEnabled()) {
+            log.debug("ReNew request recived for SynapseSubscription ID : " +
+                    subscription.getId());
+        }
+        String subID = subscription.getId();
+        if (subID != null) {
+            if (subscriptionManager.renewSubscription(subscription)) {
+                //send the response
+                if (log.isDebugEnabled()) {
+                    log.debug("Sending ReNew response for SynapseSubscription ID : " +
+                            subscription.getId());
+                }
+                SOAPEnvelope soapEnvelope =
+                        messageBuilder.genRenewSubscriptionResponse(subscription);
+                RelatesTo relatesTo = new RelatesTo(subscription.getId());
+                dispatchResponse(soapEnvelope, EventingConstants.WSE_RENEW_RESPONSE,
+                        mc, false);
+            } else {
+                // Send the Fault responce
+                if (log.isDebugEnabled()) {
+                    log.debug("ReNew failed, sending fault response");
+                }
+                SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+                        EventingConstants.WSE_FAULT_CODE_RECEIVER, "UnableToRenew",
+                        "Subscription Not Found", "");
+                dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+                        true);
+            }
+        } else {
+            SOAPEnvelope soapEnvelope = messageBuilder.genFaultResponse(mc,
+                    SubscriptionMessageBuilder.getErrorCode(),
+                    SubscriptionMessageBuilder.getErrorSubCode(),
+                    SubscriptionMessageBuilder.getErrorReason(), "");
+            dispatchResponse(soapEnvelope, EventingConstants.WSA_FAULT, mc,
+                    true);
+        }
+    }
 }