You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2008/05/09 20:38:05 UTC

svn commit: r654906 - in /synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse: endpoints/ mediators/builtin/

Author: indika
Date: Fri May  9 11:38:05 2008
New Revision: 654906

URL: http://svn.apache.org/viewvc?rev=654906&view=rev
Log:
Fix an issue in session affinity loadbalance endpoint in the case of server starting session

Modified:
    synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java
    synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java
    synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
    synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
    synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
    synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java

Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/AddressEndpoint.java Fri May  9 11:38:05 2008
@@ -61,10 +61,15 @@
             if (System.currentTimeMillis() > recoverOn) {
                 active = true;
                 endpointContext.setActive(true);
-                endpointContext.setRecoverOn(0);
+                endpointContext.setRecoverOn(0);                       
 
             }
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Endpoint  '" + getName() + "' is in state ' " + active + " '");
+        }
+
         return active;
     }
 

Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/FailoverEndpoint.java Fri May  9 11:38:05 2008
@@ -175,6 +175,10 @@
                 }
             }
         }
+        
+        if (log.isDebugEnabled()) {
+            log.debug("Endpoint  '" + name + "' is in state ' " + active + " '");
+        }
 
         return active;
     }

Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java Fri May  9 11:38:05 2008
@@ -198,6 +198,10 @@
             }
         }
 
+        if (log.isDebugEnabled()) {
+            log.debug("Endpoint  '" + name + "' is in state ' " + active + " '");
+        }
+
         return active;
     }
 

Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/SALoadbalanceEndpoint.java Fri May  9 11:38:05 2008
@@ -62,6 +62,7 @@
 
     private static final String FIRST_MESSAGE_IN_SESSION = "first_message_in_session";
     public static final String ENDPOINT_LIST = "endpointList";
+    public static final String ROOT_ENDPOINT = "rootendpoint";
     public static final String ENDPOINT_NAME_LIST = "endpointNameList";
     private static final String WARN_MESSAGE = "In a clustering environment , the endpoint " +
             " name should be specified" +
@@ -187,73 +188,53 @@
                     //  replicate endpoint itself
 
                     Object o = opCtx.getPropertyNonReplicable(ENDPOINT_NAME_LIST);
-                    if (o != null) {
+                    List epNameList;
+                    if (o instanceof List) {
 
-                        List endpointList = (List) o;
-                        endpointList.add(endPointName);
-
-                        // if the next endpoint is not a session affinity one, endpoint sequence ends
-                        // here. but we have to add the next endpoint to the list.
-                        if (!(endpoint instanceof SALoadbalanceEndpoint)) {
-                            String name = endpoint.getName();
-                            if (name == null) {
-                                log.warn(WARN_MESSAGE);
-                                name = SynapseConstants.ANONYMOUS_ENDPOINT;
-                            }
-                            endpointList.add(name);
-                        }
+                        epNameList = (List) o;
+                        epNameList.add(endPointName);
 
                     } else {
                         // this is the first endpoint in the heirachy. so create the queue and insert
                         // this as the first element.
-                        List endpointList = new ArrayList();
-                        endpointList.add(endPointName);
-
-                        // if the next endpoint is not a session affinity one, endpoint sequence ends
-                        // here. but we have to add the next endpoint to the list.
-                        if (!(endpoint instanceof SALoadbalanceEndpoint)) {
-                            String name = endpoint.getName();
-                            if (name == null) {
-                                log.warn(WARN_MESSAGE);
-                                name = SynapseConstants.ANONYMOUS_ENDPOINT;
-                            }
-                            endpointList.add(name);
-                        }
-
-                        opCtx.setProperty(ENDPOINT_NAME_LIST, endpointList);
+                        epNameList = new ArrayList();
+                        epNameList.add(endPointName);
+                        opCtx.setNonReplicableProperty(ROOT_ENDPOINT,this);
                     }
-
-                }
-
-                Object o = opCtx.getProperty(ENDPOINT_LIST);
-
-                if (o != null) {
-                    List<Endpoint> endpointList = (List<Endpoint>) o;
-                    endpointList.add(this);
-
                     // if the next endpoint is not a session affinity one, endpoint sequence ends
                     // here. but we have to add the next endpoint to the list.
                     if (!(endpoint instanceof SALoadbalanceEndpoint)) {
-                        endpointList.add(endpoint);
+                        String name = endpoint.getName();
+                        if (name == null) {
+                            log.warn(WARN_MESSAGE);
+                            name = SynapseConstants.ANONYMOUS_ENDPOINT;
+                        }
+                        epNameList.add(name);
                     }
 
-                } else {
+                    opCtx.setProperty(ENDPOINT_NAME_LIST, epNameList);
 
-                    // this is the first endpoint in the heirachy. so create the queue and insert
-                    // this as the first element.
-                    List endpointList = new ArrayList();
-                    endpointList.add(this);
+                } else {
+                    Object o = opCtx.getProperty(ENDPOINT_LIST);
+                    List<Endpoint> endpointList;
+                    if (o instanceof List) {
+                        endpointList = (List<Endpoint>) o;
+                        endpointList.add(this);
 
+                    } else {
+                        // this is the first endpoint in the heirachy. so create the queue and insert
+                        // this as the first element.
+                        endpointList = new ArrayList();
+                        endpointList.add(this);
+                        opCtx.setProperty(ENDPOINT_LIST, endpointList);
+                    }
                     // if the next endpoint is not a session affinity one, endpoint sequence ends
                     // here. but we have to add the next endpoint to the list.
                     if (!(endpoint instanceof SALoadbalanceEndpoint)) {
                         endpointList.add(endpoint);
                     }
-
-                    opCtx.setProperty(ENDPOINT_LIST, endpointList);
                 }
 
-
             } else {
                 dispatcher.updateSession(synMessageContext, dispatcherContext, endpoint);
             }
@@ -294,7 +275,7 @@
      * @param isClusteringEnable
      */
     public void updateSession(MessageContext responseMsgCtx, List endpointList,
-        boolean isClusteringEnable) {
+                              boolean isClusteringEnable) {
 
         Endpoint endpoint = null;
 
@@ -303,12 +284,12 @@
             // Only keeps endpoint names , because , it is heavy task to
             // replicate endpoint itself
             String epNameObj = (String) endpointList.remove(0);
-            for (Iterator it = endpointList.iterator(); it.hasNext();) {
-                Object epObj = it.next();
-                if (epObj != null && epObj instanceof Endpoint) {
-                    String name = ((Endpoint) epObj).getName();
+            for (Endpoint ep :endpoints) {
+                if (ep != null) {
+                    String name = ep.getName();
                     if (name != null && name.equals(epNameObj)) {
-                        endpoint = ((Endpoint) epObj);
+                        endpoint = ep;
+                        break;
                     }
                 }
             }
@@ -356,8 +337,27 @@
      */
     public boolean isActive(MessageContext synMessageContext) {
         // todo: implement above
-
-        return endpointContext.isActive();
+        boolean active;
+        Endpoint endpoint = dispatcher.getEndpoint(synMessageContext, dispatcherContext);
+        if (endpoint == null) { // If a session is not started
+            active = endpointContext.isActive();
+            if (!active && endpoints != null) {
+                for (Endpoint ep : endpoints) {
+                    if (ep.isActive(synMessageContext)) { //AND at least one child endpoint is active
+                        active = true;
+                        endpointContext.setActive(true);
+                        // don't break the loop though we found one active endpoint. calling isActive()
+                        // on all child endpoints will update their active state. so this is a good
+                        // time to do that.
+                    }
+                }
+            }
+        } else {
+            //If a session is started AND the binding endpoint is active.
+            active = endpoint.isActive(synMessageContext);
+            endpointContext.setActive(active);
+        }
+        return active;
     }
 
     public void setActive(boolean active, MessageContext synMessageContext) {

Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/endpoints/WSDLEndpoint.java Fri May  9 11:38:05 2008
@@ -240,6 +240,11 @@
                 endpointContext.setActive(true);
             }
         }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Endpoint  '" + name + "' is in state ' " + active + " '");
+        }
+
         return active;
     }
 

Modified: synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java?rev=654906&r1=654905&r2=654906&view=diff
==============================================================================
--- synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java (original)
+++ synapse/branches/1.2/modules/core/src/main/java/org/apache/synapse/mediators/builtin/SendMediator.java Fri May  9 11:38:05 2008
@@ -64,97 +64,93 @@
             }
         }
 
-        // if no endpoints are defined, send where implicitly stated
-        if (endpoint == null) {
+        if (synCtx.isResponse()) {
 
-            if (traceOrDebugOn) {
-                StringBuffer sb = new StringBuffer();
-                sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
-                        + " message using implicit message properties..");
-                sb.append("\nSending To: " + (synCtx.getTo() != null ?
-                        synCtx.getTo().getAddress() : "null"));
-                sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
-                        synCtx.getWSAAction() : "null"));
-                traceOrDebug(traceOn, sb.toString());
-            }
+            Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
+            OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
 
-            if (traceOn && trace.isTraceEnabled()) {
-                trace.trace("Envelope : " + synCtx.getEnvelope());
-            }
+            boolean isClusteringEnable = false;
 
-            if (synCtx.isResponse()) {
+            // get Axis2 MessageContext and ConfigurationContext
+            org.apache.axis2.context.MessageContext axisMC =
+                    axis2MsgCtx.getAxis2MessageContext();
+            ConfigurationContext cc = axisMC.getConfigurationContext();
 
-                Axis2MessageContext axis2MsgCtx = (Axis2MessageContext) synCtx;
-                OperationContext opCtx = axis2MsgCtx.getAxis2MessageContext().getOperationContext();
-                Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+            //The heck for clustering environment
 
-                if (o != null && o instanceof List) {
+            ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
+            if (clusterManager != null &&
+                    clusterManager.getContextManager() != null) {
+                isClusteringEnable = true;
+            }
 
-                    boolean isClusteringEnable = false;
+            if (isClusteringEnable) {
+                // if this is a clustering env.
+                // Only keeps endpoint names , because , it is heavy task to
+                // replicate endpoint itself
+                Object epNames = opCtx.getPropertyNonReplicable(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
+                if (epNames != null && epNames instanceof List) {
+
+                    List epNameList = (List) epNames;
+                    Object obj = epNameList.remove(0);
+                    if (obj != null && obj instanceof String) {
+                        Object rootEPObj = opCtx.getPropertyNonReplicable(
+                                SALoadbalanceEndpoint.ROOT_ENDPOINT);
+
+                        if (rootEPObj != null && rootEPObj instanceof Endpoint) {
+                            String name = ((Endpoint) rootEPObj).getName();
+
+                            if (name != null && name.equals(obj)) {
+                                Endpoint rootEP = ((Endpoint) rootEPObj);
+
+                                if (rootEP instanceof SALoadbalanceEndpoint) {
+                                    SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) rootEP;
+                                    salEP.updateSession(synCtx, epNameList,
+                                            isClusteringEnable);
+                                }
+                            }
+                        }
 
-                    // get Axis2 MessageContext and ConfigurationContext
-                    org.apache.axis2.context.MessageContext axisMC =
-                            axis2MsgCtx.getAxis2MessageContext();
-                    ConfigurationContext cc = axisMC.getConfigurationContext();
-
-                    //The heck for clustering environment
-
-                    ClusterManager clusterManager = cc.getAxisConfiguration().getClusterManager();
-                    if (clusterManager != null &&
-                            clusterManager.getContextManager() != null) {
-                        isClusteringEnable = true;
                     }
+                    opCtx.setProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST, epNames);
+                }
+
+            } else {
+                Object o = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_LIST);
+                if (o != null && o instanceof List) {
                     // we are in the response of the first message of a server initiated session
                     // so update all session maps
                     List epList = (List) o;
+                    Object e = epList.remove(0);
 
-
-                    if (isClusteringEnable) {
-                        // if this is a clustering env.
-                        // Only keeps endpoint names , because , it is heavy task to
-                        // replicate endpoint itself
-                        Object epNames = opCtx.getProperty(SALoadbalanceEndpoint.ENDPOINT_NAME_LIST);
-                        if (epNames != null && epNames instanceof List) {
-
-                            List epNameList = (List) epNames;
-                            Object obj = epNameList.remove(0);
-                            if (obj != null && obj instanceof String) {
-
-                                for (Iterator it = epList.iterator(); it.hasNext();) {
-                                    Object epObj = it.next();
-
-                                    if (epObj != null && epObj instanceof Endpoint) {
-                                        String name = ((Endpoint) epObj).getName();
-
-                                        if (name != null && name.equals(obj)) {
-                                            Endpoint ep = ((Endpoint) epObj);
-
-                                            if (ep instanceof SALoadbalanceEndpoint) {
-                                                SALoadbalanceEndpoint salEP
-                                                        = (SALoadbalanceEndpoint) ep;
-                                                salEP.updateSession(synCtx, epNameList,
-                                                        isClusteringEnable);
-                                            }
-                                        }
-                                    }
-                                }
-
-                            }
-                        }
-
-                    } else {
-                        Object e = epList.remove(0);
-
-                        if (e != null) {
-                            if (e instanceof SALoadbalanceEndpoint) {
-                                SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
-                                salEP.updateSession(synCtx, epList, isClusteringEnable);
-                            }
+                    if (e != null) {
+                        if (e instanceof SALoadbalanceEndpoint) {
+                            SALoadbalanceEndpoint salEP = (SALoadbalanceEndpoint) e;
+                            salEP.updateSession(synCtx, epList, isClusteringEnable);
                         }
                     }
                 }
             }
 
+        }
+
+        // if no endpoints are defined, send where implicitly stated
+        if (endpoint == null) {
+
+            if (traceOrDebugOn) {
+                StringBuffer sb = new StringBuffer();
+                sb.append("Sending " + (synCtx.isResponse() ? "response" : "request")
+                        + " message using implicit message properties..");
+                sb.append("\nSending To: " + (synCtx.getTo() != null ?
+                        synCtx.getTo().getAddress() : "null"));
+                sb.append("\nSOAPAction: " + (synCtx.getWSAAction() != null ?
+                        synCtx.getWSAAction() : "null"));
+                traceOrDebug(traceOn, sb.toString());
+            }
+
+            if (traceOn && trace.isTraceEnabled()) {
+                trace.trace("Envelope : " + synCtx.getEnvelope());
+            }
             synCtx.getEnvironment().send(null, synCtx);
 
         } else {