You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by az...@apache.org on 2011/12/06 10:31:10 UTC

svn commit: r1210832 - in /synapse/trunk/java: modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ modules/core/src/main/java/org/apache/synapse/core/axis2/ modules/core/src/main/java/org/apache/synapse/endpoints/ repository/conf/sample/

Author: azeez
Date: Tue Dec  6 09:31:09 2011
New Revision: 1210832

URL: http://svn.apache.org/viewvc?rev=1210832&view=rev
Log:
Added ServiceDynamicLoadbalanceEndpoint for dynamic load balancing of services. For more details, see ServiceDynamicLoadbalanceEndpoint in http://blog.afkham.org/2011/09/wso2-load-balancer-how-it-works.html

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java
    synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml
      - copied, changed from r1210743, synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java?rev=1210832&r1=1210831&r2=1210832&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java Tue Dec  6 09:31:09 2011
@@ -227,6 +227,12 @@ public abstract class EndpointFactory im
             return DynamicLoadbalanceEndpointFactory.getInstance();
         }
 
+        OMElement sdlbElement = configElement.getFirstChildWithName
+                (new QName(SynapseConstants.SYNAPSE_NAMESPACE, "serviceDynamicLoadbalance"));
+        if (sdlbElement != null) {
+            return ServiceDynamicLoadbalanceEndpointFactory.getInstance();
+        }
+        
         OMElement foElement = configElement.getFirstChildWithName
                 (new QName(SynapseConstants.SYNAPSE_NAMESPACE, "failover"));
         if (foElement != null) {

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java?rev=1210832&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java Tue Dec  6 09:31:09 2011
@@ -0,0 +1,212 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.config.xml.endpoints;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.config.xml.endpoints.utils.LoadbalanceAlgorithmFactory;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.ServiceDynamicLoadbalanceEndpoint;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import org.apache.synapse.endpoints.dispatch.Dispatcher;
+import org.apache.synapse.endpoints.dispatch.HttpSessionDispatcher;
+import org.apache.synapse.endpoints.dispatch.SoapSessionDispatcher;
+
+import javax.xml.namespace.QName;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * Creates {@link org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint} using an XML configuration.
+ * <p/>
+ * <pre>
+ * <endpoint name="sdLB">
+ *       <serviceDynamicLoadbalance algorithm="org.apache.synapse.endpoints.algorithms.RoundRobin"
+ *                                  configuration="file:repository/conf/lbservices.xml"/>
+ * </endpoint>
+ * </pre>
+ * <p/>
+ * The configuration file has the following format. This can be even specified inline
+ * <loadBalancerConfig>
+ * <services>
+ * <service>
+ * <hosts>
+ * <host>test1.synapse.apache.org</host>
+ * <host>test1.apache.org</host>
+ * </hosts>
+ * <domain>test1.synapse.apache.domain</domain>
+ * </service>
+ * <service>
+ * <hosts>
+ * <host>test2.synapse.apache.org</host>
+ * <host>test2.apache.org</host>
+ * </hosts>
+ * <domain>test2.synapse.apache.domain</domain>
+ * </service>
+ * </services>
+ * </loadBalancerConfig>
+ */
+public class ServiceDynamicLoadbalanceEndpointFactory extends EndpointFactory {
+
+    private static ServiceDynamicLoadbalanceEndpointFactory instance =
+            new ServiceDynamicLoadbalanceEndpointFactory();
+    public static final QName SERVICES_QNAME = new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+                                                         "services");
+    public static final QName LB_CONFIG_QNAME = new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+                                                          "loadBalancerConfig");
+
+    private ServiceDynamicLoadbalanceEndpointFactory() {
+    }
+
+    public static ServiceDynamicLoadbalanceEndpointFactory getInstance() {
+        return instance;
+    }
+
+    protected Endpoint createEndpoint(OMElement epConfig, boolean anonymousEndpoint,
+                                      Properties properties) {
+
+        OMElement loadbalanceElement =
+                epConfig.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+                                                         "serviceDynamicLoadbalance"));
+        if (loadbalanceElement == null) {
+            return null;
+        }
+
+        String configuration =
+                loadbalanceElement.getAttributeValue(new QName(XMLConfigConstants.NULL_NAMESPACE,
+                                                               "configuration"));
+        OMElement servicesEle;
+        if (configuration != null) {
+            if (configuration.startsWith("$system:")) {
+                configuration = System.getProperty(configuration.substring("$system:".length()));
+            }
+            // Load the file
+            StAXOMBuilder builder = null;
+            try {
+                builder = new StAXOMBuilder(new URL(configuration).openStream());
+            } catch (Exception e) {
+                handleException("Could not load ServiceDynamicLoadbalanceEndpoint configuration file " +
+                                configuration);
+            }
+            servicesEle = builder.getDocumentElement().getFirstChildWithName(SERVICES_QNAME);
+        } else {
+            OMElement lbConfigEle = loadbalanceElement.getFirstChildWithName(LB_CONFIG_QNAME);
+            if (lbConfigEle == null) {
+                throw new RuntimeException("loadBalancerConfig element not found as a child of " +
+                                           "serviceDynamicLoadbalance element");
+            }
+            servicesEle = lbConfigEle.getFirstChildWithName(SERVICES_QNAME);
+        }
+
+        if (servicesEle == null) {
+            throw new RuntimeException("services element not found in serviceDynamicLoadbalance configuration");
+        }
+        Map<String, String> hostDomainMap = new HashMap<String, String>();
+        for (Iterator<OMElement> iter = servicesEle.getChildrenWithLocalName("service"); iter.hasNext();) {
+            OMElement serviceEle = iter.next();
+            OMElement hostsEle =
+                    serviceEle.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE, "hosts"));
+            if (hostsEle == null) {
+                throw new RuntimeException("hosts element not found as a child of service element");
+            }
+            List<String> hosts = new ArrayList<String>();
+            for (Iterator<OMElement> hostIter = hostsEle.getChildrenWithLocalName("host");
+                 hostIter.hasNext();) {
+                OMElement hostEle = hostIter.next();
+                String host = hostEle.getText();
+                if (host.trim().length() == 0) {
+                    throw new RuntimeException("host cannot be null");
+                }
+                hosts.add(host);
+            }
+            OMElement domainEle =
+                    serviceEle.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+                                                               "domain"));
+            if (domainEle == null) {
+                throw new RuntimeException("domain element not found in as a child of services");
+            }
+            String domain = domainEle.getText();
+            if (domain.trim().length() == 0) {
+                throw new RuntimeException("domain cannot be null");
+            }
+            for (String host : hosts) {
+                if (hostDomainMap.containsKey(host)) {
+                    throw new RuntimeException("host " + host + " has been already defined for " +
+                                               "clustering domain " + hostDomainMap.get(host));
+                }
+                hostDomainMap.put(host, domain);
+            }
+        }
+        if (hostDomainMap.isEmpty()) {
+            throw new RuntimeException("No service elements defined under services");
+        }
+
+        LoadbalanceAlgorithm algorithm =
+                LoadbalanceAlgorithmFactory.
+                        createLoadbalanceAlgorithm(loadbalanceElement, null);
+
+        ServiceDynamicLoadbalanceEndpoint loadbalanceEndpoint =
+                new ServiceDynamicLoadbalanceEndpoint(hostDomainMap, algorithm);
+
+        // set endpoint name
+        OMAttribute name =
+                epConfig.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "name"));
+        if (name != null) {
+            loadbalanceEndpoint.setName(name.getAttributeValue());
+        }
+
+        // get the session for this endpoint
+        OMElement sessionElement =
+                epConfig.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE, "session"));
+        if (sessionElement != null) {
+
+            OMElement sessionTimeout = sessionElement.getFirstChildWithName(
+                    new QName(SynapseConstants.SYNAPSE_NAMESPACE, "sessionTimeout"));
+
+            if (sessionTimeout != null) {
+                try {
+                    loadbalanceEndpoint.setSessionTimeout(Long.parseLong(
+                            sessionTimeout.getText().trim()));
+                } catch (NumberFormatException nfe) {
+                    handleException("Invalid session timeout value : " + sessionTimeout.getText());
+                }
+            }
+
+            String type = sessionElement.getAttributeValue(new QName("type"));
+
+            if (type.equalsIgnoreCase("soap")) {
+                Dispatcher soapDispatcher = new SoapSessionDispatcher();
+                loadbalanceEndpoint.setDispatcher(soapDispatcher);
+
+            } else if (type.equalsIgnoreCase("http")) {
+                Dispatcher httpDispatcher = new HttpSessionDispatcher();
+                loadbalanceEndpoint.setDispatcher(httpDispatcher);
+
+            }
+
+            loadbalanceEndpoint.setSessionAffinity(true);
+        }
+        loadbalanceEndpoint.setFailover(false);
+
+        return loadbalanceEndpoint;
+    }
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java?rev=1210832&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java Tue Dec  6 09:31:09 2011
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.core.axis2;
+
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.LoadBalanceMembershipHandler;
+import org.apache.synapse.endpoints.DynamicLoadbalanceFaultHandler;
+import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Bridge between Axis2 membership notification and Synapse load balancing
+ */
+public class ServiceLoadBalanceMembershipHandler implements LoadBalanceMembershipHandler {
+    private static final Log log = LogFactory.getLog(ServiceLoadBalanceMembershipHandler.class);
+
+    private ConfigurationContext configCtx;
+
+    /**
+     * Key - Host, Value - DomainAlgorithmContext
+     */
+    private Map<String, DomainAlgorithmContext> hostDomainAlgorithmContextMap =
+                                    new HashMap<String, DomainAlgorithmContext>();
+    private ClusteringAgent clusteringAgent;
+
+    public ServiceLoadBalanceMembershipHandler(Map<String, String> hostDomainMap,
+                                               LoadbalanceAlgorithm algorithm,
+                                               ConfigurationContext configCtx,
+                                               boolean isClusteringEnabled,
+                                               String endpointName) {
+        for (Map.Entry<String, String> entry : hostDomainMap.entrySet()) {
+            AlgorithmContext algorithmContext =
+                new AlgorithmContext(isClusteringEnabled, configCtx, endpointName + "." + entry.getKey());
+            this.hostDomainAlgorithmContextMap.put(entry.getKey(),
+                                   new DomainAlgorithmContext(entry.getValue(), algorithm.clone(), algorithmContext));
+        }
+    }
+
+    public void init(Properties props, LoadbalanceAlgorithm algorithm) {
+        // Nothing to do
+    }
+
+    public void setConfigurationContext(ConfigurationContext configCtx) {
+        this.configCtx = configCtx;
+
+        // The following code does the bridging between Axis2 and Synapse load balancing
+        clusteringAgent = configCtx.getAxisConfiguration().getClusteringAgent();
+        if(clusteringAgent == null){
+            String msg = "In order to enable load balancing across an Axis2 cluster, " +
+                         "the cluster entry should be enabled in the axis2.xml file";
+            log.error(msg);
+            throw new SynapseException(msg);
+        }
+    }
+
+    public ConfigurationContext getConfigurationContext(){
+        return configCtx;
+    }
+
+    /**
+     * Getting the next member to which the request has to be sent in a round-robin fashion
+     *
+     * @param context The AlgorithmContext
+     * @return The current member
+     * @deprecated Use {@link #getNextApplicationMember(String)}
+     */
+    public Member getNextApplicationMember(AlgorithmContext context) {
+        throw new UnsupportedOperationException("This operation is invalid. " +
+                                                "Call getNextApplicationMember(String host)");
+    }
+
+    public Member getNextApplicationMember(String host) {
+        DomainAlgorithmContext domainAlgorithmContext = hostDomainAlgorithmContextMap.get(host);
+        if(domainAlgorithmContext == null) {
+            throw new SynapseException("Domain not found for host" + host);
+        }
+        String lbDomain = domainAlgorithmContext.getDomain();
+        LoadbalanceAlgorithm algorithm = domainAlgorithmContext.getAlgorithm();
+        GroupManagementAgent groupMgtAgent = clusteringAgent.getGroupManagementAgent(lbDomain);
+        if(groupMgtAgent == null){
+            String msg =
+                    "A LoadBalanceEventHandler has not been specified in the axis2.xml " +
+                    "file for the domain " + lbDomain + " for host " + host;
+            log.error(msg);
+            throw new SynapseException(msg);
+        }
+        algorithm.setApplicationMembers(groupMgtAgent.getMembers());
+        AlgorithmContext context = domainAlgorithmContext.getAlgorithmContext();
+        return algorithm.getNextApplicationMember(context);
+    }
+
+    public LoadbalanceAlgorithm getLoadbalanceAlgorithm() {
+        return null;
+    }
+
+    public Properties getProperties() {
+        return null;
+    }
+
+    /**
+     * POJO for maintaining the domain & AlgorithmContext for a particular host
+     */
+    private static class DomainAlgorithmContext {
+        // The clustering domain
+        private String domain;
+        private AlgorithmContext algorithmContext;
+        private LoadbalanceAlgorithm algorithm;
+
+        private DomainAlgorithmContext(String domain, LoadbalanceAlgorithm algorithm,
+                                       AlgorithmContext algorithmContext) {
+            this.domain = domain;
+            this.algorithm = algorithm;
+            this.algorithmContext = algorithmContext;
+        }
+
+        public LoadbalanceAlgorithm getAlgorithm() {
+            return algorithm;
+        }
+
+        public String getDomain() {
+            return domain;
+        }
+
+        public AlgorithmContext getAlgorithmContext() {
+            return algorithmContext;
+        }
+    }
+}

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java?rev=1210832&r1=1210831&r2=1210832&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java Tue Dec  6 09:31:09 2011
@@ -40,7 +40,13 @@ import org.apache.synapse.transport.nhtt
 
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
 
 /**
  * Represents a dynamic load balance endpoint. The application membership is not static,
@@ -106,7 +112,6 @@ public class DynamicLoadbalanceEndpoint 
     public void send(MessageContext synCtx) {
         SessionInformation sessionInformation = null;
         Member currentMember = null;
-        //TODO Temp hack: ESB removes the session id from request in a random manner.
         setCookieHeader(synCtx);
 
         ConfigurationContext configCtx =
@@ -145,8 +150,6 @@ public class DynamicLoadbalanceEndpoint 
             }
 
         }
-
-        setupTransportHeaders(synCtx);
         DynamicLoadbalanceFaultHandlerImpl faultHandler = new DynamicLoadbalanceFaultHandlerImpl();
         if (sessionInformation != null && currentMember != null) {
             //send message on current session
@@ -209,37 +212,6 @@ public class DynamicLoadbalanceEndpoint 
         return null;
     }
 
-    /**
-     * Adds the X-Forwarded-For header to the outgoing message.
-     *
-     * @param synCtx Current message context
-     */
-	protected void setupTransportHeaders(MessageContext synCtx) {
-		Axis2MessageContext axis2smc = (Axis2MessageContext) synCtx;
-        org.apache.axis2.context.MessageContext axis2MessageCtx =
-                axis2smc.getAxis2MessageContext();
-        Object headers = axis2MessageCtx.getProperty(
-                org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
-        if (headers != null && headers instanceof Map ) {
-        	Map headersMap = (Map) headers;
-        	String xForwardFor = (String) headersMap.get(NhttpConstants.HEADER_X_FORWARDED_FOR);
-        	String remoteHost = (String) axis2MessageCtx.getProperty(
-                    org.apache.axis2.context.MessageContext.REMOTE_ADDR);
-
-            if (xForwardFor != null && !"".equals(xForwardFor)) {
-                StringBuilder xForwardedForString = new StringBuilder();
-                xForwardedForString.append(xForwardFor);
-                if (remoteHost != null && !"".equals(remoteHost)) {
-                    xForwardedForString.append(",").append(remoteHost);
-                }
-                headersMap.put(NhttpConstants.HEADER_X_FORWARDED_FOR, xForwardedForString.toString());
-            } else {
-                headersMap.put(NhttpConstants.HEADER_X_FORWARDED_FOR,remoteHost);
-            }
-
-        }
-	}
-
     public void setName(String name) {
         super.setName(name);
 //        algorithmContext.setContextID(name);
@@ -310,8 +282,7 @@ public class DynamicLoadbalanceEndpoint 
         }
 
         Map<String, String> memberHosts;
-        if ((memberHosts = (Map<String, String>) currentMember.getProperties().get(
-                HttpSessionDispatcher.HOSTS)) == null) {
+        if ((memberHosts = (Map<String, String>) currentMember.getProperties().get(HttpSessionDispatcher.HOSTS)) == null) {
             currentMember.getProperties().put(HttpSessionDispatcher.HOSTS,
                     memberHosts = new HashMap<String, String>());
         }

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java?rev=1210832&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java Tue Dec  6 09:31:09 2011
@@ -0,0 +1,280 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.synapse.endpoints;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.management.DefaultGroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.protocol.HTTP;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.LoadBalanceMembershipHandler;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.core.axis2.ServiceLoadBalanceMembershipHandler;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
+import org.apache.synapse.transport.nhttp.NhttpConstants;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Represents a dynamic load balance endpoint. The application membership is not static,
+ * but discovered through some mechanism such as using a GCF
+ */
+public class ServiceDynamicLoadbalanceEndpoint extends DynamicLoadbalanceEndpoint {
+
+    private static final Log log = LogFactory.getLog(ServiceDynamicLoadbalanceEndpoint.class);
+
+    /**
+     * Axis2 based membership handler which handles members in multiple clustering domains
+     */
+    private ServiceLoadBalanceMembershipHandler slbMembershipHandler;
+
+    /**
+     * Key - host, Value - domain
+     */
+    private Map<String, String> hostDomainMap;
+
+    @Override
+    public void init(SynapseEnvironment synapseEnvironment) {
+        if (!initialized) {
+            super.init(synapseEnvironment);
+            ConfigurationContext cfgCtx =
+                    ((Axis2SynapseEnvironment) synapseEnvironment).getAxis2ConfigurationContext();
+            ClusteringAgent clusteringAgent = cfgCtx.getAxisConfiguration().getClusteringAgent();
+            if (clusteringAgent == null) {
+                throw new SynapseException("Axis2 ClusteringAgent not defined in axis2.xml");
+            }
+            // Add the Axis2 GroupManagement agents
+            for (String domain : hostDomainMap.values()) {
+                if (clusteringAgent.getGroupManagementAgent(domain) == null) {
+                    clusteringAgent.addGroupManagementAgent(new DefaultGroupManagementAgent(), domain);
+                }
+            }
+            slbMembershipHandler = new ServiceLoadBalanceMembershipHandler(hostDomainMap,
+                                                                           getAlgorithm(),
+                                                                           cfgCtx,
+                                                                           isClusteringEnabled,
+                                                                           getName());
+
+            // Initialize the SAL Sessions if already has not been initialized.
+            SALSessions salSessions = SALSessions.getInstance();
+            if (!salSessions.isInitialized()) {
+                salSessions.initialize(isClusteringEnabled, cfgCtx);
+            }
+            initialized = true;
+            log.info("ServiceDynamicLoadbalanceEndpoint initialized");
+        }
+    }
+
+    public ServiceDynamicLoadbalanceEndpoint(Map<String, String> hostDomainMap,
+                                             LoadbalanceAlgorithm algorithm) {
+
+        this.hostDomainMap = hostDomainMap;
+        setAlgorithm(algorithm);
+    }
+
+    public LoadBalanceMembershipHandler getLbMembershipHandler() {
+        return slbMembershipHandler;
+    }
+
+    public Map<String, String> getHostDomainMap() {
+        return Collections.unmodifiableMap(hostDomainMap);
+    }
+
+    public void send(MessageContext synCtx) {
+        setCookieHeader(synCtx);
+        //TODO: Refactor Session Aware LB dispatching code
+
+        // Check whether a valid session for session aware dispatching is available
+        Member currentMember = null;
+        SessionInformation sessionInformation = null;
+        if (isSessionAffinityBasedLB()) {
+            // first check if this session is associated with a session. if so, get the endpoint
+            // associated for that session.
+            sessionInformation =
+                    (SessionInformation) synCtx.getProperty(
+                            SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+
+            currentMember = (Member) synCtx.getProperty(
+                    SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+
+            if (sessionInformation == null && currentMember == null) {
+                sessionInformation = dispatcher.getSession(synCtx);
+                if (sessionInformation != null) {
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("Current session id : " + sessionInformation.getId());
+                    }
+
+                    currentMember = sessionInformation.getMember();
+                    synCtx.setProperty(
+                            SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER, currentMember);
+                    // This is for reliably recovery any session information if while response is getting ,
+                    // session information has been removed by cleaner.
+                    // This will not be a cost as  session information a not heavy data structure
+                    synCtx.setProperty(
+                            SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
+                }
+            }
+
+        }
+
+        // Dispatch request the relevant member
+        String targetHost = getTargetHost(synCtx);
+        ConfigurationContext configCtx =
+                ((Axis2MessageContext) synCtx).getAxis2MessageContext().getConfigurationContext();
+        if (slbMembershipHandler.getConfigurationContext() == null) {
+            slbMembershipHandler.setConfigurationContext(configCtx);
+        }
+        ServiceDynamicLoadbalanceFaultHandlerImpl faultHandler = new ServiceDynamicLoadbalanceFaultHandlerImpl();
+        faultHandler.setHost(targetHost);
+        if (sessionInformation != null && currentMember != null) {
+            //send message on current session
+            sessionInformation.updateExpiryTime();
+            sendToApplicationMember(synCtx, currentMember, faultHandler, false);
+        } else {
+            // prepare for a new session
+            currentMember = slbMembershipHandler.getNextApplicationMember(targetHost);
+            if (currentMember == null) {
+                String msg = "No application members available";
+                log.error(msg);
+                throw new SynapseException(msg);
+            }
+            sendToApplicationMember(synCtx, currentMember, faultHandler, true);
+        }
+    }
+
+    private String getTargetHost(MessageContext synCtx) {
+        org.apache.axis2.context.MessageContext axis2MessageContext =
+                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+        Map<String, String> headers =
+                (Map<String, String>) axis2MessageContext.
+                        getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
+        String address = headers.get(HTTP.TARGET_HOST);
+        synCtx.setProperty("LB_REQUEST_HOST", address); // Need to set with the port
+        if (address.contains(":")) {
+            address = address.substring(0, address.indexOf(":"));
+        }
+        return address;
+    }
+
+    /**
+     * This FaultHandler will try to resend the message to another member if an error occurs
+     * while sending to some member. This is a failover mechanism
+     */
+    private class ServiceDynamicLoadbalanceFaultHandlerImpl extends DynamicLoadbalanceFaultHandler {
+
+        private EndpointReference to;
+        private Member currentMember;
+        private Endpoint currentEp;
+        private String host;
+
+        private static final int MAX_RETRY_COUNT = 5;
+
+        // ThreadLocal variable to keep track of how many times this fault handler has been
+        // called
+        private ThreadLocal<Integer> callCount = new ThreadLocal<Integer>() {
+            protected Integer initialValue() {
+                return 0;
+            }
+        };
+
+        public void setHost(String host) {
+            this.host = host;
+        }
+
+        public void setCurrentMember(Member currentMember) {
+            this.currentMember = currentMember;
+        }
+
+        public void setTo(EndpointReference to) {
+            this.to = to;
+        }
+
+        private ServiceDynamicLoadbalanceFaultHandlerImpl() {
+        }
+
+        public void onFault(MessageContext synCtx) {
+            if (currentMember == null) {
+                return;
+            }
+            currentMember.suspend(10000);     // TODO: Make this configurable.
+            log.info("Suspended member " + currentMember + " for 10s");
+
+            // Prevent infinite retrying to failed members
+            callCount.set(callCount.get() + 1);
+            if (callCount.get() >= MAX_RETRY_COUNT) {
+                return;
+            }
+
+            //cleanup endpoint if exists
+            if (currentEp != null) {
+                currentEp.destroy();
+            }
+            Integer errorCode = (Integer) synCtx.getProperty(SynapseConstants.ERROR_CODE);
+            if (errorCode != null) {
+                if (errorCode.equals(NhttpConstants.CONNECTION_FAILED) ||
+                    errorCode.equals(NhttpConstants.CONNECT_CANCEL) ||
+                    errorCode.equals(NhttpConstants.CONNECT_TIMEOUT)) {
+                    // Try to resend to another member
+                    Member newMember = slbMembershipHandler.getNextApplicationMember(host);
+                    if (newMember == null) {
+                        String msg = "No application members available";
+                        log.error(msg);
+                        throw new SynapseException(msg);
+                    }
+                    log.info("Failed over to " + newMember);
+                    synCtx.setTo(to);
+                    if (isSessionAffinityBasedLB()) {
+                        //We are sending the this message on a new session,
+                        // hence we need to remove previous session information
+                        Set pros = synCtx.getPropertyKeySet();
+                        if (pros != null) {
+                            pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+                        }
+                    }
+                    try {
+                        Thread.sleep(1000);  // Sleep for sometime before retrying
+                    } catch (InterruptedException ignored) {
+                    }
+                    sendToApplicationMember(synCtx, newMember, this, true);
+                } else if (errorCode.equals(NhttpConstants.SND_IO_ERROR_SENDING) ||
+                           errorCode.equals(NhttpConstants.CONNECTION_CLOSED)) {
+                    // TODO: Envelope is consumed
+                }
+            }
+            // We cannot failover since we are using binary relay
+        }
+
+        public void setCurrentEp(Endpoint currentEp) {
+            this.currentEp = currentEp;
+        }
+    }
+}

Copied: synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml (from r1210743, synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml)
URL: http://svn.apache.org/viewvc/synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml?p2=synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml&p1=synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml&r1=1210743&r2=1210832&rev=1210832&view=diff
==============================================================================
--- synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml (original)
+++ synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml Tue Dec  6 09:31:09 2011
@@ -18,20 +18,33 @@
   ~  under the License.
   -->
 
-<!-- Session less load balancing between 3 endpoints -->
+<!-- Load balancing between 2 Cloud Services -->
 <definitions xmlns="http://ws.apache.org/ns/synapse">
 
     <sequence name="main" onError="errorHandler">
         <in>
+            <property name="SERVICE_PREFIX" expression="$axis2:SERVICE_PREFIX"/>
             <send>
-                <endpoint name="dynamicLB">
-                    <dynamicLoadbalance failover="true"
-                                        algorithm="org.apache.synapse.endpoints.algorithms.RoundRobin">
-                        <membershipHandler
-                                class="org.apache.synapse.core.axis2.Axis2LoadBalanceMembershipHandler">
-                            <property name="applicationDomain" value="apache.axis2.app.domain"/>
-                        </membershipHandler>
-                    </dynamicLoadbalance>
+                <endpoint name="sdlbEndpoint">
+                    <serviceDynamicLoadbalance failover="true"
+                                               algorithm="org.apache.synapse.endpoints.algorithms.RoundRobin">
+                        <loadBalancerConfig>
+                            <services>
+                                <service>
+                                    <hosts>
+                                        <host>test1.synapse.apache.org</host>
+                                    </hosts>
+                                    <domain>test1.synapse.domain</domain>
+                                </service>
+                                <service>
+                                    <hosts>
+                                        <host>test2.synapse.apache.org</host>
+                                    </hosts>
+                                    <domain>test2.synapse.domain</domain>
+                                </service>
+                            </services>
+                        </loadBalancerConfig>
+                    </serviceDynamicLoadbalance>
                 </endpoint>
             </send>
             <drop/>