You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/10/07 20:38:47 UTC

svn commit: r702579 [4/6] - in /synapse/trunk/java/modules: core/src/main/java/org/apache/synapse/ core/src/main/java/org/apache/synapse/config/ core/src/main/java/org/apache/synapse/config/xml/ core/src/main/java/org/apache/synapse/config/xml/endpoint...

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java Tue Oct  7 11:38:44 2008
@@ -19,22 +19,18 @@
 
 package org.apache.synapse.endpoints;
 
-import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.context.OperationContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.FaultHandler;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseConstants;
-import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
 import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
 import org.apache.synapse.endpoints.dispatch.Dispatcher;
-import org.apache.synapse.endpoints.dispatch.DispatcherContext;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 /**
  * SALoadbalanceEndpoint supports session affinity based load balancing. Each of this endpoint
@@ -55,443 +51,281 @@
  * send(...) method of that endpoint. If not it will find an endpoint using the load balancing
  * policy and send to that endpoint.
  */
-public class SALoadbalanceEndpoint implements Endpoint {
-
-    private static final Log log = LogFactory.getLog(SALoadbalanceEndpoint.class);
-
-    private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
-    public static final String ENDPOINT_LIST = "endpointList";
-    public static final String ROOT_ENDPOINT = "rootendpoint";
-    public static final String ENDPOINT_NAME_LIST = "endpointNameList";
-    public static final String WARN_MESSAGE = "In a clustering environment, the endpoint " +
-            "name should be specified even for anonymous endpoints. Otherwise the clustering " +
-            "would not function properly, if there are more than one anonymous endpoints.";
-
-    /**
-     * Name of the endpoint. Used for named endpoints which can be referred using the key attribute
-     * of indirect endpoints.
-     */
-    private String name = null;
-
-    /**
-     * List of endpoints among which the load is distributed. Any object implementing the Endpoint
-     * interface could be used.
-     */
-    private List<Endpoint> endpoints = null;
-
-    /**
-     * Algorithm used for selecting the next endpoint to direct the first request of sessions.
-     * Default is RoundRobin.
-     */
-    private LoadbalanceAlgorithm algorithm = null;
-
-    /**
-     * Parent endpoint of this endpoint if this used inside another endpoint. Although any endpoint
-     * can be the parent, only SALoadbalanceEndpoint should be used here. Use of any other endpoint
-     * would invalidate the session.
-     */
-    private Endpoint parentEndpoint = null;
-
+public class SALoadbalanceEndpoint extends LoadbalanceEndpoint {
+  
     /**
      * Dispatcher used for session affinity.
      */
     private Dispatcher dispatcher = null;
 
-    /**
-     * The dispatcher context, place holder for keeping any runtime states that are used when
-     * finding endpoint for the session
-     */
-    private final DispatcherContext dispatcherContext = new DispatcherContext();
-
-    /**
-     * The endpoint context, place holder for keeping any runtime states related to the endpoint
-     */
-    private final EndpointContext endpointContext = new EndpointContext();
-
-    /**
-     * The algorithm context, place holder for keeping any runtime states related to the load
-     * balance algorithm
-     */
-    private final AlgorithmContext algorithmContext = new AlgorithmContext();
-
-
-    public void send(MessageContext synMessageContext) {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Start : Session Affinity Load-balance Endpoint " + name);
-        }
 
-        boolean isClusteringEnable = false;
-        // get Axis2 MessageContext and ConfigurationContext
-        org.apache.axis2.context.MessageContext axisMC =
-                ((Axis2MessageContext) synMessageContext).getAxis2MessageContext();
-        ConfigurationContext cc = axisMC.getConfigurationContext();
-
-        //The check for clustering environment
-        ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
-        if (clusterManager != null &&
-                clusterManager.getContextManager() != null) {
-            isClusteringEnable = true;
-        }
-
-        String endpointName = this.getName();
-        if (endpointName == null) {
-            if (isClusteringEnable) {
-                log.warn(WARN_MESSAGE);
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("Using the name for the anonymous endpoint as : '"
-                        + SynapseConstants.ANONYMOUS_ENDPOINT + "'");
-            }
-            endpointName = SynapseConstants.ANONYMOUS_ENDPOINT;
-        }
+    /* Sessions time out interval*/
+    private long sessionTimeout = -1;
 
-        if (isClusteringEnable) {
+    public void init(ConfigurationContext cc) {
 
-            // if this is a cluster environment, then set configuration context to endpoint context
-            if (endpointContext.getConfigurationContext() == null) {
+        if (!initialized) {
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting the ConfigurationContext to " +
-                            "the EndpointContext with the name " + endpointName +
-                            " for replicating data on the cluster");
-                }
-                endpointContext.setConfigurationContext(cc);
-                endpointContext.setContextID(endpointName);
+            super.init(cc);
+            // Initialize the SAL Sessions if already has not been initialized.
+            SALSessions salSessions = SALSessions.getInstance();
+            if (!salSessions.isInitialized()) {
+                salSessions.initialize(isClusteringEnabled, cc);
             }
 
-            // if this is a cluster environment, then set configuration context to load balance
-            //  algorithm context
-            if (algorithmContext.getConfigurationContext() == null) {
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting the ConfigurationContext to " +
-                            "the AlgorithmContext with the name " + endpointName +
-                            " for replicating data on the cluster");
-                }
-                algorithmContext.setConfigurationContext(cc);
-                algorithmContext.setContextID(endpointName);
+            //For each root level SAL endpoints , all children are registered 
+            // This is for cluttering as in clustering only endpoint names are replicated 
+            // and it needs way to pick endpoints by name
+            if (isClusteringEnabled && (this.getParentEndpoint() == null ||
+                    !(this.getParentEndpoint() instanceof SALoadbalanceEndpoint))) {
+                SALSessions.getInstance().registerChildren(this, getChildren());
             }
 
-            // if this is a cluster environment, then set configuration context to session based
-            // endpoint dispatcher
-            if (dispatcherContext.getConfigurationContext() == null) {
+        }
+    }
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting the ConfigurationContext to " +
-                            "the DispatcherContext with the name " + endpointName +
-                            " for replicating data on the cluster");
-                }
-                dispatcherContext.setConfigurationContext(cc);
-                dispatcherContext.setContextID(endpointName);
+    public void send(MessageContext synCtx) {
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting the endpoints to the DispatcherContext : " + endpoints);
-                }
-                dispatcherContext.setEndpoints(endpoints);
-            }
+        if (log.isDebugEnabled()) {
+            log.debug("Start : Session Affinity Load-balance Endpoint " + getName());
         }
-
         // first check if this session is associated with a session. if so, get the endpoint
         // associated for that session.
-        Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
-        if (endpoint == null) {
-
-            // there is no endpoint associated with this session. get a new endpoint using the
-            // load balance policy.
-            endpoint = algorithm.getNextEndpoint(synMessageContext, algorithmContext);
 
-            // this is a start of a new session. so update session map.
-            if (dispatcher.isServerInitiatedSession()) {
+        SessionInformation sessionInformation =
+                (SessionInformation) synCtx.getProperty(
+                        SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
 
-                if (log.isDebugEnabled()) {
-                    log.debug("Adding a new server initiated session for the current message");
-                }
+        List<Endpoint> endpoints = (List<Endpoint>) synCtx.getProperty(
+                SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_ENDPOINT_LIST);
 
-                // add this endpoint to the endpoint sequence of operation context.
-                Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synMessageContext;
-                OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
-
-                if (isClusteringEnable) {
-                    // If running on a cluster keep endpoint names, because it is heavy to
-                    // replicate endpoint itself
-
-                    Object o = opCtx.getPropertyNonReplicable(ENDPOINT_NAME_LIST);
-                    List<String> epNameList;
-                    if (o instanceof List) {
-                        epNameList = (List<String>) o;
-                        epNameList.add(endpointName);
-                    } else {
-                        // this is the first endpoint in the heirachy. so create the queue and
-                        // insert this as the first element.
-                        epNameList = new ArrayList<String>();
-                        epNameList.add(endpointName);
-                        opCtx.setNonReplicableProperty(ROOT_ENDPOINT, this);
-                    }
-                    
-                    // if the next endpoint is not a session affinity one, endpoint sequence ends
-                    // here. but we have to add the next endpoint to the list.
-                    if (!(endpoint instanceof SALoadbalanceEndpoint)) {
-
-                        String name;
-                        if (endpoint instanceof IndirectEndpoint) {
-                            name = ((IndirectEndpoint) endpoint).getKey();
-                        } else {
-                            name = endpoint.getName();
-                        }
+        if (sessionInformation == null && endpoints == null) {
 
-                        if (name == null) {
-                            log.warn(WARN_MESSAGE);
-                            name = SynapseConstants.ANONYMOUS_ENDPOINT;
-                        }
-                        epNameList.add(name);
-                    }
+            sessionInformation = dispatcher.getSession(synCtx);
+            if (sessionInformation != null) {
 
-                    if (log.isDebugEnabled()) {
-                        log.debug("Operating on a cluster. Setting the endpoint name list to " +
-                                "the OperationContext : " + epNameList);
-                    }
-                    opCtx.setProperty(ENDPOINT_NAME_LIST, epNameList);
-
-                } else {
-                    
-                    Object o = opCtx.getProperty(ENDPOINT_LIST);
-                    List<Endpoint> endpointList;
-                    if (o instanceof List) {
-                        endpointList = (List<Endpoint>) o;
-                        endpointList.add(this);
-                    } else {
-                        // this is the first endpoint in the heirachy. so create the queue and
-                        // insert this as the first element.
-                        endpointList = new ArrayList<Endpoint>();
-                        endpointList.add(this);
-                        opCtx.setProperty(ENDPOINT_LIST, endpointList);
-                    }
-                    
-                    // if the next endpoint is not a session affinity one, endpoint sequence ends
-                    // here. but we have to add the next endpoint to the list.
-                    if (!(endpoint instanceof SALoadbalanceEndpoint)) {
-                        endpointList.add(endpoint);
-                    }
+                if (log.isDebugEnabled()) {
+                    log.debug("Current session id : " + sessionInformation.getId());
                 }
-
-            } else {
-                dispatcher.updateSession(synMessageContext, dispatcherContext, endpoint);
-            }
-
-            // this is the first request. so an endpoint has not been bound to this session and we
-            // are free to failover if the currently selected endpoint is not working. but for
-            // failover to work, we have to build the soap envelope.
-            synMessageContext.getEnvelope().build();
-
-            // we should also indicate that this is the first message in the session. so that
-            // onFault(...) method can resend only the failed attempts for the first message.
-            synMessageContext.setProperty(FIRST_MESSAGE_IN_SESSION, Boolean.TRUE);
-        }
-
-        if (endpoint != null) {
-
-            // endpoints given by session dispatchers may not be active. therefore, we have check
-            // it here.
-            if (endpoint.isActive(synMessageContext)) {
+                endpoints =
+                        dispatcher.getEndpoints(sessionInformation);
                 if (log.isDebugEnabled()) {
-                    log.debug("Using the endpoint on the session with "
-                            + ((endpoint instanceof IndirectEndpoint) ? "key : "
-                            + ((IndirectEndpoint) endpoint).getKey() : "name : "
-                            + endpoint.getName()) + " for sending the message");
+                    log.debug("Endpoint sequence (path) on current session : " + this + endpoints);
                 }
-                endpoint.send(synMessageContext);
-            } else {
-                informFailure(synMessageContext);
+
+                synCtx.setProperty(
+                        SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_ENDPOINT_LIST, endpoints);
+                // This is for reliably recovery any session information if while response is getting ,
+                // session information has been removed by cleaner.
+                // This will not be a cost as  session information a not heavy data structure
+                synCtx.setProperty(
+                        SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
             }
+        }
 
+        if (sessionInformation != null && endpoints != null) {
+            //send message on current session
+            sendMessageOnCurrentSession(sessionInformation.getId(), endpoints, synCtx);
         } else {
-
-            // all child endpoints have failed. so mark this also as failed.
-            if (log.isDebugEnabled()) {
-                log.debug("Marking the Endpoint as failed, " +
-                        "because all child endpoints has been failed");
-            }
-            setActive(false, synMessageContext);
-            informFailure(synMessageContext);
+            // prepare for a new session 
+            sendMessageOnNewSession(synCtx);
         }
+    }  
+
+    public Dispatcher getDispatcher() {
+        return dispatcher;
+    }
+
+    public void setDispatcher(Dispatcher dispatcher) {
+        this.dispatcher = dispatcher;
     }
 
     /**
-     * This will be called for the response of the first message of each server initiated session.
+     * It is logically incorrect to failover a session affinity endpoint after the session has started.
+     * If we redirect a message belonging to a particular session, new endpoint is not aware of the
+     * session. So we can't handle anything more at the endpoint level. Therefore, this method just
+     * deactivate the failed endpoint and give the fault to the next fault handler.
+     * <p/>
+     * But if the session has not started (i.e. first message), the message will be resend by binding
+     * it to a different endpoint.
      *
-     * @param responseMsgCtx
-     * @param endpointList
-     * @param isClusteringEnable
+     * @param endpoint          Failed endpoint.
+     * @param synCtx MessageContext of the failed message.
      */
-    public void updateSession(MessageContext responseMsgCtx, List endpointList,
-                              boolean isClusteringEnable) {
+    public void onChildEndpointFail(Endpoint endpoint, MessageContext synCtx) {
 
-        Endpoint endpoint = null;
+        Object o = synCtx.getProperty(
+                SynapseConstants.PROP_SAL_ENDPOINT_FIRST_MESSAGE_IN_SESSION);
 
-        if (isClusteringEnable) {
-            // if this is a clustering env. only keep endpoint names, because, it is heavy to
-            // replicate endpoint itself
-            String epNameObj = (String) endpointList.remove(0);
-            for (Endpoint ep : endpoints) {
-                if (ep != null) {
-
-                    String name;
-                    if (ep instanceof IndirectEndpoint) {
-                        name = ((IndirectEndpoint) ep).getKey();
-                    } else {
-                        name = ep.getName();
-                    }
+        if (o != null && Boolean.TRUE.equals(o)) {
 
-                    if (name != null && name.equals(epNameObj)) {
-                        endpoint = ep;
-                        break;
+            // this is the first message. so unbind the session with failed endpoint and start
+            // new one by resending.
+            
+            dispatcher.unbind(synCtx);
+            
+            // As going to be happened retry , we have to remove states related to the previous try 
+            
+            Object epListObj = synCtx.getProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+            if (epListObj instanceof List) {
+                List<Endpoint> endpointList = (List<Endpoint>) epListObj;
+                if (!endpointList.isEmpty()) {
+                    if (endpointList.get(0) == this) {
+                        endpointList.clear();
+                    } else {
+                        if (endpointList.contains(this)) {
+                            int lastIndex = endpointList.indexOf(this);
+                            List<Endpoint> head = 
+                                    endpointList.subList(lastIndex , endpointList.size());       
+                            head.clear();
+                        }
                     }
                 }
             }
+            
+            send(synCtx);
 
         } else {
-            endpoint = (Endpoint) endpointList.remove(0);
-        }
-
-        if (endpoint != null) {
-
-            dispatcher.updateSession(responseMsgCtx, dispatcherContext, endpoint);
-            if (endpoint instanceof SALoadbalanceEndpoint) {
-                ((SALoadbalanceEndpoint) endpoint).updateSession(
-                        responseMsgCtx, endpointList, isClusteringEnable);
-            }
+            // session has already started. we can't failover.
+            informFailure(synCtx, SynapseConstants.ENDPOINT_SAL_FAILED_SESSION,
+                    "Failure an endpoint " + endpoint + "  in the  current session");
         }
     }
 
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name.trim();
-    }
-
-    public LoadbalanceAlgorithm getAlgorithm() {
-        return algorithm;
-    }
-
-    public void setAlgorithm(LoadbalanceAlgorithm algorithm) {
-        this.algorithm = algorithm;
-    }
-
-    /**
-     * This is active in below conditions:
-     * If a session is not started AND at least one child endpoint is active.
-     * If a session is started AND the binding endpoint is active.
-     * <p/>
-     * This is not active for all other conditions.
-     *
-     * @param synMessageContext MessageContext of the current message. This is used to determine the
-     *                          session.
-     * @return true is active. false otherwise.
+    /*
+    * Helper method  that send message on the endpoint sequence on the current session
      */
-    public boolean isActive(MessageContext synMessageContext) {
-        // todo: implement above
-        boolean active;
-        Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
-        if (endpoint == null) { // If a session is not started
-            active = endpointContext.isActive();
-            if (!active && endpoints != null) {
-                for (Endpoint ep : endpoints) {
-                    if (ep != null) {
-                        active = ep.isActive(synMessageContext);
-                        if (active) {    //AND at least one child endpoint is active
-                            endpointContext.setActive(active);
-                            // don't break the loop though we found one active endpoint. calling isActive()
-                            // on all child endpoints will update their active state. so this is a good
-                            // time to do that.
+    private void sendMessageOnCurrentSession(String sessionID, List<Endpoint> endpoints, MessageContext synCtx) {
+                
+        // get the next endpoint in the endpoint sequence
+        Endpoint endpoint = null;
+
+        boolean invalidSequence = false;
+        if (endpoints.isEmpty()) {
+            invalidSequence = true;
+        } else {
+            if (endpoints.contains(this)) {
+                // This situation will come only if this endpoint is referred as an indirect endpoint.
+                //  All the path before this SAL endpoint are ignored.
+                int length = endpoints.size();
+                if (length > 1) {
+                    
+                    int beginIndex = endpoints.lastIndexOf(this) + 1;
+                    if (beginIndex == length) {
+                        invalidSequence = true;
+                    } else {
+                        endpoints = endpoints.subList(beginIndex, length);
+                        if (!endpoints.isEmpty()) {
+                            endpoint = endpoints.remove(0);
+                        } else {
+                            invalidSequence = true;
                         }
                     }
+                } else {
+                    invalidSequence = true;
                 }
-            }
-        } else {
-            //If a session is started AND the binding endpoint is active.
-            active = endpoint.isActive(synMessageContext);
-            if (active) {
-                endpointContext.setActive(active);
+
+            } else {
+                endpoint = endpoints.remove(0);
             }
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("SALoadbalanceEndpoint with name '" + getName() + "' is in "
-                    + (active ? "active" : "inactive") + " state");
+        if (invalidSequence) {
+            informFailure(synCtx, SynapseConstants.ENDPOINT_SAL_INVALID_PATH,
+                    "Invalid endpoint sequence " + endpoints + " for session with id " + sessionID);
+            return;
+        }
+        // endpoints given by session dispatchers may not be active. therefore, we have check
+        // it here.
+        if (endpoint != null && endpoint.readyToSend()) {
+            if (log.isDebugEnabled()) {
+                log.debug("Using the endpoint " + endpoint + " for sending the message");
+            }
+            synCtx.pushFaultHandler(this);
+            endpoint.send(synCtx);
+        } else {
+            informFailure(synCtx, SynapseConstants.ENDPOINT_SAL_NOT_READY,
+                    "The endpoint " + endpoint + " on the session with id " +
+                            sessionID + " is not ready.");
         }
-
-        return active;
     }
 
-    public void setActive(boolean active, MessageContext synMessageContext) {
-        endpointContext.setActive(active);
-    }
+    /*
+     * Helper method that send message hoping to establish new session 
+     */
+    private void sendMessageOnNewSession(MessageContext synCtx) {
 
-    public List<Endpoint> getEndpoints() {
-        return endpoints;
-    }
+        // there is no endpoint associated with this session. get a new endpoint using the
+        // load balance policy.
+        Endpoint endpoint = getNextChild(synCtx);
+        if (endpoint == null) {
 
-    public void setEndpoints(List<Endpoint> endpoints) {
-        this.endpoints = endpoints;
-    }
+            informFailure(synCtx, SynapseConstants.ENDPOINT_LB_NONE_READY,
+                    "SLALoadbalance endpoint : " + getName() + " - no ready child endpoints");
+        } else {
 
-    public void setParentEndpoint(Endpoint parentEndpoint) {
-        this.parentEndpoint = parentEndpoint;
-    }
+            prepareEndPointSequence(synCtx, endpoint);
 
-    public Dispatcher getDispatcher() {
-        return dispatcher;
-    }
+            // this is the first request. so an endpoint has not been bound to this session and we
+            // are free to failover if the currently selected endpoint is not working. but for
+            // failover to work, we have to build the soap envelope.
+            synCtx.getEnvelope().build();
 
-    public void setDispatcher(Dispatcher dispatcher) {
-        this.dispatcher = dispatcher;
+            // we should also indicate that this is the first message in the session. so that
+            // onFault(...) method can resend only the failed attempts for the first message.
+            synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_FIRST_MESSAGE_IN_SESSION, Boolean.TRUE);
+            synCtx.pushFaultHandler(this);
+            endpoint.send(synCtx);
+        }
     }
 
-    /**
-     * It is logically incorrect to failover a session affinity endpoint after the session has started.
-     * If we redirect a message belonging to a particular session, new endpoint is not aware of the
-     * session. So we can't handle anything more at the endpoint level. Therefore, this method just
-     * deactivate the failed endpoint and give the fault to the next fault handler.
-     * <p/>
-     * But if the session has not started (i.e. first message), the message will be resend by binding
-     * it to a different endpoint.
-     *
-     * @param endpoint          Failed endpoint.
-     * @param synMessageContext MessageContext of the failed message.
-     */
-    public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
+    private void informFailure(MessageContext synCtx, int errorCode, String errorMsg) {
 
-        Object o = synMessageContext.getProperty(FIRST_MESSAGE_IN_SESSION);
+        if (synCtx.getProperty(SynapseConstants.LAST_ENDPOINT) == null) {
+            synCtx.setProperty(SynapseConstants.ERROR_CODE, errorCode);
+            synCtx.setProperty(SynapseConstants.ERROR_MESSAGE, errorMsg);
+            synCtx.setProperty(SynapseConstants.ERROR_DETAIL, errorMsg);
+            synCtx.setProperty(SynapseConstants.ERROR_EXCEPTION, null);
+        }
+        super.onFault(synCtx);
+    }
 
-        if (o != null && Boolean.TRUE.equals(o)) {
+    /*
+    * Preparing the endpoint sequence for a new session establishment request
+    */
+    private void prepareEndPointSequence(MessageContext synCtx, Endpoint endpoint) {
 
-            // this is the first message. so unbind the sesion with failed endpoint and start
-            // new one by resending.
-            dispatcher.unbind(synMessageContext, dispatcherContext);
-            send(synMessageContext);
+        Object o = synCtx.getProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST);
+        List<Endpoint> endpointList;
+        if (o instanceof List) {
+            endpointList = (List<Endpoint>) o;
+            endpointList.add(this);
 
         } else {
-
-            // session has already started. we can't failover.
-            informFailure(synMessageContext);
+            // this is the first endpoint in the hierarchy. so create the queue and
+            // insert this as the first element.
+            endpointList = new ArrayList<Endpoint>();
+            endpointList.add(this);
+            synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_ENDPOINT_LIST, endpointList);
+            synCtx.setProperty(SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_DISPATCHER, dispatcher);
         }
-    }
-
-    private void informFailure(MessageContext synMessageContext) {
 
-        log.warn("Failed to send using the selected endpoint, becasue it is inactive");
-        
-        if (parentEndpoint != null) {
-            parentEndpoint.onChildEndpointFail(this, synMessageContext);
-        } else {
-            Object o = synMessageContext.getFaultStack().pop();
-            if (o != null) {
-                ((FaultHandler) o).handleFault(synMessageContext);
+        // if the next endpoint is not a session affinity one, endpoint sequence ends
+        // here. but we have to add the next endpoint to the list.
+        if (!(endpoint instanceof SALoadbalanceEndpoint)) {
+            endpointList.add(endpoint);
+            // Clearing out if there any any session information with current message 
+            if (dispatcher.isServerInitiatedSession()) {
+                dispatcher.removeSessionID(synCtx);
             }
         }
     }
 
+    public long getSessionTimeout() {
+        return sessionTimeout;
+    }
+
+    public void setSessionTimeout(long sessionTimeout) {
+        this.sessionTimeout = sessionTimeout;
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java Tue Oct  7 11:38:44 2008
@@ -22,154 +22,38 @@
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.synapse.FaultHandler;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.core.axis2.Axis2MessageContext;
-import org.apache.synapse.endpoints.utils.EndpointDefinition;
-
-import java.util.Stack;
 
 /**
- * WSDLEndpoint represents the endpoints built using a wsdl document. It stores the details about
- * the endpoint in a EndpointDefinition object. Once the WSDLEndpoint object is constructed, it
- * should not access the wsdl document at runtime to obtain endpoint information. If it is necessary
- * to create an endpoint using a dynamic wsdl, store the endpoint configuration in the registry and
- * create a dynamic wsdl endpoint using that registry key.
+ * WSDLEndpoint represents the endpoints built using a WSDL document. It stores the details about
+ * the endpoint in an EndpointDefinition object. Once the WSDLEndpoint object is constructed, it
+ * should not access the WSDL document at runtime to obtain endpoint information. If it is necessary
+ * to create an endpoint using a dynamic WSDL, store the endpoint configuration in the registry and
+ * create a dynamic WSDL endpoint using that registry key.
  * <p/>
  * TODO: This should allow various policies to be applied on fine grained level (e.g. operations).
  */
-public class WSDLEndpoint extends FaultHandler implements Endpoint {
-
-    private static final Log log = LogFactory.getLog(WSDLEndpoint.class);
-    private static final Log trace = LogFactory.getLog(SynapseConstants.TRACE_LOGGER);
+public class WSDLEndpoint extends AbstractEndpoint {
 
-    private String name = null;
     private String wsdlURI;
     private OMElement wsdlDoc;
     private String serviceName;
     private String portName;
 
-    private Endpoint parentEndpoint = null;
-    private EndpointDefinition endpoint = null;
-
-    /**
-     * The endpoint context , place holder for keep any runtime states related to the endpoint
-     */
-    private final EndpointContext endpointContext = new EndpointContext();
-
-    /**
-     * Sends the message through this endpoint. This method just handles statistics related
-     * functions and gives the message to the Synapse environment to send. It does not add any
-     * endpoint specific details to the message context. These details are added only to the cloned
-     * message context by the Axis2FlexibleMepClient. So that we can reuse the original message
-     * context for resending through different endpoints.
-     *
-     * @param synCtx MessageContext sent by client to Synapse
-     */
-    public void send(MessageContext synCtx) {
-
-        boolean traceOn = isTraceOn(synCtx);
-        boolean traceOrDebugOn = isTraceOrDebugOn(traceOn);
-
-        if (traceOrDebugOn) {
-            traceOrDebug(traceOn, "Start : WSDL Endpoint");
-
-            if (traceOn && trace.isTraceEnabled()) {
-                trace.trace("Message : " + synCtx.getEnvelope());
-            }
-        }
-
-        if (endpoint.getAddress() != null) {
-
-            String eprAddress = endpoint.getAddress();
-            boolean isClusteringEnable = false;
-            // get Axis2 MessageContext and ConfigurationContext
-            org.apache.axis2.context.MessageContext axisMC =
-                    ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-            ConfigurationContext cc = axisMC.getConfigurationContext();
-
-            //The check for clustering environment
-            ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
-            if (clusterManager != null &&
-                    clusterManager.getContextManager() != null) {
-                isClusteringEnable = true;
-            }
-
-            String endPointName = this.getName();
-            if (endPointName == null) {
-
-                if (traceOrDebugOn && isClusteringEnable) {
-                    log.warn("In a clustering environment , the endpoint  name should be " +
-                            "specified even for anonymous endpoints. Otherwise, the clustering " +
-                            "would not be functioned correctly if there are more than one " +
-                            "anonymous endpoints. ");
-                }
-                endPointName = SynapseConstants.ANONYMOUS_ENDPOINT;
-            }
-
-            if (isClusteringEnable) {
-                // if this is a cluster environment , then set configuration context
-                // to endpoint context
-                if (endpointContext.getConfigurationContext() == null) {
-                    endpointContext.setConfigurationContext(cc);
-                    endpointContext.setContextID(endPointName);
-                }
-            }
-
-            // Setting Required property to collect the End Point audit
-           
-            if (traceOrDebugOn) {
-                traceOrDebug(traceOn, "Sending message to WSDL endpoint : " +
-                        endPointName + " resolves to address = " + eprAddress);
-                traceOrDebug(traceOn, "SOAPAction: " + (synCtx.getSoapAction() != null ?
-                        synCtx.getSoapAction() : "null"));
-                traceOrDebug(traceOn, "WSA-Action: " + (synCtx.getWSAAction() != null ?
-                        synCtx.getWSAAction() : "null"));
-
-                if (traceOn && trace.isTraceEnabled()) {
-                    trace.trace("Envelope : \n" + synCtx.getEnvelope());
-                }
-            }
-
-            // register this as the immediate fault handler for this message.
-            synCtx.pushFaultHandler(this);
-
-            // add this as the last endpoint to process this message. it is used by audit code.
-            synCtx.setProperty(SynapseConstants.PROCESSED_ENDPOINT, this);
-
-            synCtx.getEnvironment().send(endpoint, synCtx);
-        }
-    }
-
     public void onFault(MessageContext synCtx) {
-        // perform retries here
-
-        // if this endpoint has actually failed, inform the parent.
-        setActive(false, synCtx);
-
-        if (parentEndpoint != null) {
-            parentEndpoint.onChildEndpointFail(this, synCtx);
-        } else {
-            Stack faultStack = synCtx.getFaultStack();
-            if (!faultStack.isEmpty()) {
-                ((FaultHandler) faultStack.pop()).handleFault(synCtx);
-            }
+        // is this really a fault or a timeout/connection close etc?
+        if (isTimeout(synCtx)) {
+            getContext().onTimeout();
+        } else if (isSuspendFault(synCtx)) {
+            getContext().onFault();
         }
+        super.onFault(synCtx);
     }
 
-    public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
-        // WSDLEndpoint does not contain any child endpoints. So this method will never be called.
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name.trim();
+    public void onSuccess() {
+        getContext().onSuccess();
     }
 
     public String getWsdlURI() {
@@ -203,103 +87,4 @@
     public void setPortName(String portName) {
         this.portName = portName;
     }
-
-    /**
-     * Checks if the endpoint is active (failed or not). If endpoint is in failed state and
-     * suspendOnFailDuration has elapsed, it will be set to active.
-     *
-     * @param synMessageContext MessageContext of the current message. This is not used here.
-     * @return true if endpoint is active. false otherwise.
-     */
-    public boolean isActive(MessageContext synMessageContext) {
-        boolean active = endpointContext.isActive();
-        if (!active) {
-            long recoverOn = endpointContext.getRecoverOn();
-            if (System.currentTimeMillis() > recoverOn) {
-                active = true;
-                endpointContext.setActive(true);
-            }
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("WSDLEndpoint with name '" + name + "' is in "
-                    + (active ? "active" : "inactive") + " state");
-        }
-
-        return active;
-    }
-
-    /**
-     * Sets if endpoint active or not. if endpoint is set as failed (active = false), the recover on
-     * time is calculated so that it will be activated after the recover on time.
-     *
-     * @param active            true if active. false otherwise.
-     * @param synMessageContext MessageContext of the current message. This is not used here.
-     */
-    public void setActive(boolean active, MessageContext synMessageContext) {
-
-        if (!active) {
-            if (endpoint.getSuspendOnFailDuration() != -1) {
-                // Calculating a new value by adding suspendOnFailDuration to current time.
-                // as the endpoint is set as failed
-                endpointContext.setRecoverOn(
-                        System.currentTimeMillis() + endpoint.getSuspendOnFailDuration());
-            } else {
-                endpointContext.setRecoverOn(Long.MAX_VALUE);
-            }
-        }
-
-        endpointContext.setActive(true);
-    }
-
-    public void setParentEndpoint(Endpoint parentEndpoint) {
-        this.parentEndpoint = parentEndpoint;
-    }
-
-    public EndpointDefinition getEndpoint() {
-        return endpoint;
-    }
-
-    public void setEndpoint(EndpointDefinition endpoint) {
-        this.endpoint = endpoint;
-    }
-
-    /**
-     * Should this mediator perform tracing? True if its explicitly asked to
-     * trace, or its parent has been asked to trace and it does not reject it
-     *
-     * @param msgCtx the current message
-     * @return true if tracing should be performed
-     */
-    protected boolean isTraceOn(MessageContext msgCtx) {
-        return
-                (endpoint.getTraceState() == SynapseConstants.TRACING_ON) ||
-                        (endpoint.getTraceState() == SynapseConstants.TRACING_UNSET &&
-                                msgCtx.getTracingState() == SynapseConstants.TRACING_ON);
-    }
-
-    /**
-     * Is tracing or debug logging on?
-     *
-     * @param isTraceOn is tracing known to be on?
-     * @return true, if either tracing or debug logging is on
-     */
-    protected boolean isTraceOrDebugOn(boolean isTraceOn) {
-        return isTraceOn || log.isDebugEnabled();
-    }
-
-    /**
-     * Perform Trace and Debug logging of a message @INFO (trace) and DEBUG (log)
-     *
-     * @param traceOn is runtime trace on for this message?
-     * @param msg     the message to log/trace
-     */
-    protected void traceOrDebug(boolean traceOn, String msg) {
-        if (traceOn) {
-            trace.info(msg);
-        }
-        if (log.isDebugEnabled()) {
-            log.debug(msg);
-        }
-    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/AlgorithmContext.java Tue Oct  7 11:38:44 2008
@@ -18,73 +18,57 @@
 */
 package org.apache.synapse.endpoints.algorithms;
 
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.context.Replicator;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.SynapseException;
-import org.apache.synapse.util.Replicator;
 
 /**
- * Keeps the states of the load balance algorithm.This hides where those states are kept.For a
- * cluster environment ,all states are kept in the axis2 configuration context in order to replicate
- * those states so that other synapse instance in the same cluster can see those changes .
- * This class can be evolved to keep any run time states related to the endpoint .
- * For a non-clustered environment , all data are kept locally.
- * <p/>
- * This class provide the abstraction need to separate the dynamic data from the static data
- * and improve the  high cohesion and provides capability to replicate only required state at
- * a given time. This improves the performance when replicate data.
+ * Keeps the runtime state of the algorithm
  */
 public class AlgorithmContext {
 
     private static final Log log = LogFactory.getLog(AlgorithmContext.class);
 
-    /* The  static constant only for construct key prefix for each property in a dispatcher context
-     * as it is need when those property state going to replicate in a cluster env. */
-    private static final String UNDERSCORE_STRING = "_";
-    private static final String CURRENT_EPR = "currentEPR";
-
-    /* The axis configuration context-  this will hold the all callers states
-     * when doing throttling in a clustered environment. */
-    private ConfigurationContext configCtx;
-
-    /* Is this env. support clustering*/
-    private boolean isClusteringEnable = false;
-
-    /* The key for 'currentEPR' attribute and this is used when this attribute value being
-     * replicated */
-    private String currentEPRPropertyKey;
+    private static final String KEY_PREFIX = "synapse.endpoint.lb.algorithm.";
+    private static final String CURRENT_EPR = ".current_epr";
+
+    /* The axis2 configuration context - this hold state in a clustered environment. */
+    private ConfigurationContext cfgCtx;
+
+    /* Are we supporting clustering ? */
+    private Boolean isClusteringEnabled = null;
+
+    /* The key for 'currentEPR' attribute when replicated in a clsuter */
+    private String CURRENT_EPR_PROP_KEY;
 
     /* The pointer to current epr - The position of the current EPR */
     private int currentEPR = 0;
 
+    public AlgorithmContext(boolean clusteringEnabled, ConfigurationContext cfgCtx, String endpointName) {
+        this.cfgCtx = cfgCtx;
+        if (clusteringEnabled) {
+            isClusteringEnabled = Boolean.TRUE;
+        }
+        CURRENT_EPR_PROP_KEY = KEY_PREFIX + endpointName + CURRENT_EPR;
+    }
+
     /**
-     * To get the  position of the current EPR
-     * If there is no value and if there will not appear any errors , then '0' will be returned.
+     * To get the position of the current EPR for use. Default to 0 - i.e. first endpoint
      *
      * @return The  position of the current EPR
      */
     public int getCurrentEndpointIndex() {
 
-        if (this.isClusteringEnable) {  // if this is a clustering env.
+        if (Boolean.TRUE.equals(isClusteringEnabled)) {
 
-            if (this.currentEPRPropertyKey == null || "".equals(this.currentEPRPropertyKey)) {
-                handleException("Cannot find the required key to find the " +
-                        "shared state of the 'currentEPR' attribute");
-            }
-
-            Object value = this.configCtx.getPropertyNonReplicable(this.currentEPRPropertyKey);
+            Object value = cfgCtx.getPropertyNonReplicable(this.CURRENT_EPR_PROP_KEY);
             if (value == null) {
                 return 0;
-            }
-            try {
-                if (value instanceof Integer) {
-                    return (Integer) value;
-                } else if (value instanceof String) {
-                    return Integer.parseInt((String) value);
-                }
-            } catch (NumberFormatException e) {
-                handleException("The invalid value for the 'currentEPR' attribute");
+            } else if (value instanceof Integer) {
+                return ((Integer) value);
             }
         } else {
             return currentEPR;
@@ -97,21 +81,18 @@
      *
      * @param currentEPR The current position
      */
-    public void setCurrentEndpointIndex(int currentEPR) {
+    public void setCurrentEPR(int currentEPR) {
 
-        if (isClusteringEnable) {  // if this is a clustering env.
+        if (Boolean.TRUE.equals(isClusteringEnabled)) {
 
-            if (currentEPRPropertyKey != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Setting the current EPR " + currentEPR
-                            + " with the key " + currentEPRPropertyKey);
-                }
-                // Sets the property and  replicates the current state  so that all instances
-                Replicator.setAndReplicateState(currentEPRPropertyKey, currentEPR, configCtx);
+            if (log.isDebugEnabled()) {
+                log.debug("Set EPR with key : " + CURRENT_EPR_PROP_KEY + " as : " + currentEPR);
             }
+            setAndReplicateState(CURRENT_EPR_PROP_KEY, currentEPR);
+
         } else {
             if (log.isDebugEnabled()) {
-                log.debug("Setting the current EPR " + currentEPR);
+                log.debug("Setting the current EPR as : " + currentEPR);
             }
             this.currentEPR = currentEPR;
         }
@@ -123,55 +104,54 @@
      * @return Returns the ConfigurationContext instance
      */
     public ConfigurationContext getConfigurationContext() {
-        return configCtx;
+        return cfgCtx;
     }
 
     /**
-     * Sets the  ConfigurationContext instance . This is only used for cluster env.
-     * By setting this , indicates that this is a cluster env.
+     * Helper methods for handle errors.
      *
-     * @param configCtx The ConfigurationContext instance
+     * @param msg The error message
      */
-    public void setConfigurationContext(ConfigurationContext configCtx) {
-
-        if (configCtx == null) {
-            handleException("The ConfigurationContext cannot be null when system " +
-                    "in a cluster environment");
-        }
-
-        this.configCtx = configCtx;
-        this.isClusteringEnable = true; // Now, the environment is considered as a cluster
+    protected void handleException(String msg) {
+        log.error(msg);
+        throw new SynapseException(msg);
     }
 
     /**
-     * Sets the identifier for this algorithm context , so that , this can be identified
-     * uniquely across the cluster. The id will be the name of the endpoint
+     * Helper methods for handle errors.
      *
-     * @param contextID The Id for this algorithm context
+     * @param msg The error message
+     * @param e   The exception
      */
-    public void setContextID(String contextID) {
-
-        if (contextID == null || "".equals(contextID)) {
-            handleException("The Context ID cannot be null when system in a cluster environment");
-        }
-
-        //Making required key for each property in the algorithm context- Those will be used when
-        //replicating states
-        StringBuffer buffer = new StringBuffer();
-        buffer.append(contextID);
-        buffer.append(UNDERSCORE_STRING);
-        buffer.append(CURRENT_EPR);
-        currentEPRPropertyKey = buffer.toString();
+    protected void handleException(String msg, Exception e) {
+        log.error(msg, e);
+        throw new SynapseException(msg, e);
     }
 
-
     /**
-     * Helper methods for handle errors.
+     * Helper method to replicates states of the property with given key
+     * Sets property and  replicates the current state  so that all instances
+     * across cluster can see this state
      *
-     * @param msg The error message
+     * @param key   The key of the property
+     * @param value The value of the property
      */
-    private void handleException(String msg) {
-        log.error(msg);
-        throw new SynapseException(msg);
+    private void setAndReplicateState(String key, Object value) {
+
+        if (cfgCtx != null && key != null && value != null) {
+
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Replicating property key : " + key + " as : " + value);
+                }
+                cfgCtx.setProperty(key, value);
+                Replicator.replicate(cfgCtx, new String[]{key});
+
+            } catch (ClusteringFault clusteringFault) {
+                handleException("Error replicating property : " + key + " as : " +
+                    value, clusteringFault);
+            }
+        }
     }
+
 }
\ No newline at end of file

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java Tue Oct  7 11:38:44 2008
@@ -36,14 +36,15 @@
      *
      * @param members The application members
      */
-    void setApplicationMembers(List<Member> members);
-
-    /**
-     * Set the endpoints
-     *
-     * @param endpoints The endpoints
-     */
-    void setEndpoints(List<Endpoint> endpoints);
+//    TODO FIX-RUWAN
+//    void setApplicationMembers(List<Member> members);
+//
+//    /**
+//     * Set the endpoints
+//     *
+//     * @param endpoints The endpoints
+//     */
+//    void setEndpoints(List<Endpoint> endpoints);
 
     /**
      * This method returns the next node according to the algorithm implementation.
@@ -63,7 +64,8 @@
      * @param algorithmContext The context in which holds run time states related to the algorithm
      * @return Next application member to which the request has to be sent to
      */
-    Member getNextApplicationMember(AlgorithmContext algorithmContext);
+//    TODO FIX-RUWAN
+//    Member getNextApplicationMember(AlgorithmContext algorithmContext);
 
     /**
      * Resets the algorithm to its initial position. Initial position depends on the implementation.
@@ -71,4 +73,10 @@
      * @param algorithmContext The context in which holds run time states related to the algorithm
      */
     void reset(AlgorithmContext algorithmContext);
+
+    /**
+     * Return the name of the load balancing algorithm
+     * @return the name of the algorithm implemented
+     */
+    public String getName();
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java Tue Oct  7 11:38:44 2008
@@ -19,12 +19,13 @@
 
 package org.apache.synapse.endpoints.algorithms;
 
-import org.apache.axis2.clustering.Member;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.endpoints.Endpoint;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -38,21 +39,9 @@
     /**
      * Endpoints list for the round robin algorithm
      */
-    private List<Endpoint> endpoints = null;
+    private List endpoints = null;
 
-    /**
-     * List of application members in the loadb balance group
-     */
-    private List<Member> members;
-
-    public RoundRobin() {
-    }
-
-    public void setApplicationMembers(List<Member> members) {
-        this.members = members;
-    }
-
-    public void setEndpoints(List<Endpoint> endpoints) {
+    public RoundRobin(List endpoints) {
         this.endpoints = endpoints;
     }
 
@@ -60,16 +49,12 @@
      * Choose an active endpoint using the round robin algorithm. If there are no active endpoints
      * available, returns null.
      *
-     * @param synapseMessageContext MessageContext instance which holds all per-message properties
-     * @param algorithmContext The context in which holds run time states related to the algorithm
+     * @param synCtx MessageContext instance which holds all per-message properties
+     * @param  algorithmContext The context in which holds run time states related to the algorithm
      * @return endpoint to send the next message
      */
-    public Endpoint getNextEndpoint(MessageContext synapseMessageContext,
-                                    AlgorithmContext algorithmContext) {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Using the Round Robin loadbalancing algorithm to select the next endpoint");
-        }
+    public Endpoint getNextEndpoint(MessageContext synCtx,
+        AlgorithmContext algorithmContext) {
 
         Endpoint nextEndpoint;
         int attempts = 0;
@@ -77,53 +62,34 @@
         do {
             // two successive clients could get the same endpoint if not synchronized.
             synchronized (this) {
-                nextEndpoint = endpoints.get(currentEPR);
+                nextEndpoint = (Endpoint) endpoints.get(currentEPR);
 
                 if (currentEPR == endpoints.size() - 1) {
                     currentEPR = 0;
                 } else {
                     currentEPR++;
                 }
-                algorithmContext.setCurrentEndpointIndex(currentEPR);
+                algorithmContext.setCurrentEPR(currentEPR);
             }
 
             attempts++;
             if (attempts > endpoints.size()) {
-                log.warn("Couldn't find an endpoint from the Round Robin loadbalancing algorithm");
                 return null;
             }
 
-        } while (!nextEndpoint.isActive(synapseMessageContext));
+        } while (!nextEndpoint.readyToSend());
 
         return nextEndpoint;
     }
 
-    public Member getNextApplicationMember(AlgorithmContext algorithmContext) {
-        if (members.size() == 0) {
-            return null;
-        }
-        int currentMemberIndex = algorithmContext.getCurrentEndpointIndex();
-        if (currentMemberIndex >= members.size()) {
-            currentMemberIndex = 0;
-        }
-        Member current = members.get(currentMemberIndex);
-        if (currentMemberIndex == members.size() - 1) {
-            currentMemberIndex = 0;
-        } else {
-            currentMemberIndex++;
-        }
-        algorithmContext.setCurrentEndpointIndex(currentMemberIndex);
-        if(log.isDebugEnabled()) {
-            log.debug("Members       : " + members.size());
-            log.debug("Current member: " + current);
-        }
-        return current;
-    }
-
     public void reset(AlgorithmContext algorithmContext) {
         if (log.isDebugEnabled()) {
             log.debug("Resetting the Round Robin loadbalancing algorithm ...");
         }
-        algorithmContext.setCurrentEndpointIndex(0);
+        algorithmContext.setCurrentEPR(0);
+    }
+
+    public String getName() {
+        return "RoundRobin";
     }
 }

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java?rev=702579&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java Tue Oct  7 11:38:44 2008
@@ -0,0 +1,134 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.endpoints.dispatch;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.endpoints.Endpoint;
+
+import javax.xml.namespace.QName;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public abstract class AbstractDispatcher implements Dispatcher {
+
+    protected Log log;
+    private final static String TRANSPORT_HEADERS = "TRANSPORT_HEADERS";
+
+    protected AbstractDispatcher() {
+        log = LogFactory.getLog(this.getClass());
+    }
+
+    public List<Endpoint> getEndpoints(SessionInformation sessionInformation) {
+        return SALSessions.getInstance().getChildEndpoints(sessionInformation);
+    }
+
+    protected String extractSessionID(OMElement header, QName keyQName) {
+
+        OMElement sgcIDElm = getHeaderBlock(header, keyQName);
+
+        if (sgcIDElm != null) {
+            String sgcID = sgcIDElm.getText();
+
+            if (sgcID != null && !"".equals(sgcID)) {
+                return sgcID.trim();
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug(keyQName + " is null or empty");
+                }
+            }
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Couldn't find the " + keyQName + " SOAP header to find the session");
+            }
+        }
+        return null;
+    }
+
+    protected String extractSessionID(MessageContext synCtx, String key) {
+
+        if (key != null) {
+            Map headerMap = getTransportHeaderMap(synCtx);
+            if (headerMap != null) {
+
+                Object cookie = headerMap.get(key);
+
+                if (cookie instanceof String) {
+                    return (String) cookie;
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Couldn't find the " + key + " header to find the session");
+                    }
+                }
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("Couldn't find the TRANSPORT_HEADERS to find the session");
+                }
+
+            }
+        }
+        return null;
+    }
+
+    protected void removeSessionID(MessageContext synCtx, String key) {
+
+        if (key != null) {
+            Map headerMap = getTransportHeaderMap(synCtx);
+            if (headerMap != null) {
+                headerMap.remove(key);
+            }
+        }
+    }
+
+    protected void removeSessionID(OMElement header, QName keyQName) {
+
+        OMElement sgcIDElm = getHeaderBlock(header, keyQName);
+        if (sgcIDElm != null) {
+            sgcIDElm.detach();
+        }
+    }
+
+
+    private Map getTransportHeaderMap(MessageContext synCtx) {
+
+        org.apache.axis2.context.MessageContext axis2MessageContext =
+                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+        Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
+        if (o != null && o instanceof Map) {
+            return (Map) o;
+        }
+        return null;
+    }
+
+    private OMElement getHeaderBlock(OMElement soapHeader, QName keyQName) {
+
+        if (soapHeader != null) {
+            return soapHeader.getFirstChildWithName(keyQName);
+        }
+        return null;
+    }
+}

Propchange: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/AbstractDispatcher.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/Dispatcher.java Tue Oct  7 11:38:44 2008
@@ -22,6 +22,8 @@
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.endpoints.Endpoint;
 
+import java.util.List;
+
 /**
  * Defines the behavior of session dispatchers. There can be two dispatcher types. Server initiated
  * session dispatchers and client initiated session dispatchers. In the former one, server generates
@@ -37,30 +39,25 @@
      * synapseMessageContext is not found it should return null.
      *
      * @param synCtx client -> esb message context.
-     * @param dispatcherContext context for dispatching
      * @return Endpoint Endpoint associated with this session.
      */
-    public Endpoint getEndpoint(MessageContext synCtx, DispatcherContext dispatcherContext);
+    public SessionInformation getSession(MessageContext synCtx);
 
     /**
      * Updates the session maps. This will be called in the first client -> synapse -> server flow
      * for client initiated sessions. For server initiated sessions, this will be called in the
      * first server -> synapse -> client flow.
      *
-     * @param synCtx   SynapseMessageContext
-     * @param dispatcherContext context for dispatching
-     * @param endpoint Selected endpoint for this session.
+     * @param synCtx SynapseMessageContext
      */
-    public void updateSession(MessageContext synCtx, DispatcherContext dispatcherContext,
-        Endpoint endpoint);
+    public void updateSession(MessageContext synCtx);
 
     /**
      * Removes the session belonging to the given message context.
      *
      * @param synCtx MessageContext containing an session ID.
-     * @param dispatcherContext context for dispatching
      */
-    public void unbind(MessageContext synCtx, DispatcherContext dispatcherContext);
+    public void unbind(MessageContext synCtx);
 
     /**
      * Determine whether the session supported by the implementing dispatcher is initiated by the
@@ -69,4 +66,18 @@
      * @return true, if the session is initiated by the server. false, otherwise.
      */
     public boolean isServerInitiatedSession();
+
+    /**
+     * Returns the endpoint sequence associated with current session with out root
+     *
+     * @param sessionInformation Current Session information
+     * @return Endpoint sequence
+     */
+    public List<Endpoint> getEndpoints(SessionInformation sessionInformation);
+
+    /**
+     * Remove the session Id - To clear out session information from current message
+     * @param syCtx MessageContext containing an session ID
+     */
+    public void removeSessionID(MessageContext syCtx);
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java?rev=702579&r1=702578&r2=702579&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/dispatch/HttpSessionDispatcher.java Tue Oct  7 11:38:44 2008
@@ -19,24 +19,17 @@
 
 package org.apache.synapse.endpoints.dispatch;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.synapse.MessageContext;
-import org.apache.synapse.core.axis2.Axis2MessageContext;
-import org.apache.synapse.endpoints.Endpoint;
 
-import java.util.Map;
 
 /**
  * Dispatches sessions based on HTTP cookies. Session is initiated by the server in the first
  * response when it sends "Set-Cookie" HTTP header with the session ID. For all successive messages
  * client should send "Cookie" HTTP header with session ID send by the server.
  */
-public class HttpSessionDispatcher implements Dispatcher {
+public class HttpSessionDispatcher extends AbstractDispatcher {
 
-    private static final Log log = LogFactory.getLog(HttpSessionDispatcher.class);
 
-    private final static String TRANSPORT_HEADERS = "TRANSPORT_HEADERS";
     /*HTTP Headers  */
     private final static String COOKIE = "Cookie";
     private final static String SET_COOKIE = "Set-Cookie";
@@ -49,40 +42,8 @@
      * @param synCtx MessageContext possibly containing a "Cookie" HTTP header.
      * @return Endpoint Server endpoint for the given HTTP session.
      */
-    public Endpoint getEndpoint(MessageContext synCtx, DispatcherContext dispatcherContext) {
-
-        Endpoint endpoint = null;
-
-        org.apache.axis2.context.MessageContext axis2MessageContext =
-                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
-        Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
-        if (o != null && o instanceof Map) {
-            Map headerMap = (Map) o;
-            Object cookie = headerMap.get(COOKIE);
-
-            if (cookie != null && cookie instanceof String) {
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Using the HTTP header 'Cookie: " + cookie
-                            + "' to retrieve the endpoint in the transport session");
-                }
-
-                Object ep = dispatcherContext.getEndpoint((String) cookie);
-                if (ep != null && ep instanceof Endpoint) {
-                    endpoint = (Endpoint) ep;
-                } else if (log.isDebugEnabled()) {
-                    log.debug("No endpoint found in the transport " +
-                            "session for the session id " + cookie);
-                }
-                
-            } else if (log.isDebugEnabled()) {
-                log.debug("No 'Cookie' HTTP headers found to extract the " +
-                        "endpoint from the transport session");
-            }
-        }
-
-        return endpoint;
+    public SessionInformation getSession(MessageContext synCtx) {
+        return SALSessions.getInstance().getSession(extractSessionID(synCtx, COOKIE));
     }
 
     /**
@@ -90,76 +51,39 @@
      * session ID is not already in the session map update the session map by mapping the cookie
      * to the endpoint.
      *
-     * @param synCtx   MessageContext possibly containing the "Set-Cookie" HTTP header.
-     * @param endpoint Endpoint to be mapped to the session.
+     * @param synCtx MessageContext possibly containing the "Set-Cookie" HTTP header.
      */
-    public void updateSession(MessageContext synCtx, DispatcherContext dispatcherContext,
-        Endpoint endpoint) {
+    public void updateSession(MessageContext synCtx) {
 
-        if (endpoint == null || dispatcherContext == null) {
-            return;
-        }
+        Object cookie = extractSessionID(synCtx, SET_COOKIE);
 
-        org.apache.axis2.context.MessageContext axis2MessageContext =
-                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+        if (cookie != null && cookie instanceof String) {
 
-        Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
-        if (o != null && o instanceof Map) {
-            Map headerMap = (Map) o;
-            Object cookie = headerMap.get(SET_COOKIE);
-
-            if (cookie != null && cookie instanceof String) {
-                
-                // extract the first name value pair of the Set-Cookie header, which is considered
-                // as the session id which will be sent back from the client with the Cookie header
-                // for example;
-                //      Set-Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE; Path=/
-                // will result in the session id "JSESSIONID=760764CB72E96A7221506823748CF2AE"
-                // and the client is expected to send the Cookie header as;
-                //      Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE
-                if (log.isDebugEnabled()) {
-                    log.debug("Found the HTTP header 'Set-Cookie: "
-                            + cookie + "' for updating the session");
-                }
-                String sessionId = ((String) cookie).split(";")[0];
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Using the session id '" + sessionId +
-                            "' extracted from the Set-Cookie header to update the session " +
-                            "with the endpoint " + endpoint);
-                }
-                dispatcherContext.setEndpoint(sessionId, endpoint);
-                
-            } else if (log.isDebugEnabled()) {
-                log.debug("No 'Set-Cookie' HTTP header is specified in " +
-                        "the message to update the session");
+            // extract the first name value pair of the Set-Cookie header, which is considered
+            // as the session id which will be sent back from the client with the Cookie header
+            // for example;
+            //      Set-Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE; Path=/
+            // will result in the session id "JSESSIONID=760764CB72E96A7221506823748CF2AE"
+            // and the client is expected to send the Cookie header as;
+            //      Cookie: JSESSIONID=760764CB72E96A7221506823748CF2AE
+            if (log.isDebugEnabled()) {
+                log.debug("Found the HTTP header 'Set-Cookie: "
+                        + cookie + "' for updating the session");
             }
-        }
-    }
+            String sessionId = ((String) cookie).split(";")[0];
 
-    public void unbind(MessageContext synCtx, DispatcherContext dispatcherContext) {
-
-        org.apache.axis2.context.MessageContext axis2MessageContext =
-                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
-        Object o = axis2MessageContext.getProperty(TRANSPORT_HEADERS);
-        if (o != null && o instanceof Map) {
-            Map headerMap = (Map) o;
-            Object cookie = headerMap.get(COOKIE);
-
-            if (cookie != null && cookie instanceof String) {
-                
-                if (log.isDebugEnabled()) {
-                    log.debug("Using the HTTP header 'Cookie: "
-                            + cookie + "' to unbind the session");
-                }
-                dispatcherContext.removeSession((String) cookie);
-                
-            } else if (log.isDebugEnabled()) {
-                log.debug("No 'Cookie' HTTP header is specified in " +
-                        "the message to unbind the session");
+            if (log.isDebugEnabled()) {
+                log.debug("Using the session id '" + sessionId +
+                        "' extracted from the Set-Cookie header ");
             }
+
+            SALSessions.getInstance().updateSession(synCtx, sessionId);
         }
+
+    }
+
+    public void unbind(MessageContext synCtx) {
+        SALSessions.getInstance().removeSession(extractSessionID(synCtx, COOKIE));
     }
 
     /**
@@ -170,4 +94,8 @@
     public boolean isServerInitiatedSession() {
         return true;
     }
+
+    public void removeSessionID(MessageContext syCtx) {
+        removeSessionID(syCtx, COOKIE);
+    }
 }