You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by az...@apache.org on 2011/12/06 10:31:10 UTC
svn commit: r1210832 - in /synapse/trunk/java:
modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/
modules/core/src/main/java/org/apache/synapse/core/axis2/
modules/core/src/main/java/org/apache/synapse/endpoints/
repository/conf/sample/
Author: azeez
Date: Tue Dec 6 09:31:09 2011
New Revision: 1210832
URL: http://svn.apache.org/viewvc?rev=1210832&view=rev
Log:
Added ServiceDynamicLoadbalanceEndpoint for dynamic load balancing of services. For more details, see ServiceDynamicLoadbalanceEndpoint in http://blog.afkham.org/2011/09/wso2-load-balancer-how-it-works.html
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java
synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml
- copied, changed from r1210743, synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java?rev=1210832&r1=1210831&r2=1210832&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java Tue Dec 6 09:31:09 2011
@@ -227,6 +227,12 @@ public abstract class EndpointFactory im
return DynamicLoadbalanceEndpointFactory.getInstance();
}
+ OMElement sdlbElement = configElement.getFirstChildWithName
+ (new QName(SynapseConstants.SYNAPSE_NAMESPACE, "serviceDynamicLoadbalance"));
+ if (sdlbElement != null) {
+ return ServiceDynamicLoadbalanceEndpointFactory.getInstance();
+ }
+
OMElement foElement = configElement.getFirstChildWithName
(new QName(SynapseConstants.SYNAPSE_NAMESPACE, "failover"));
if (foElement != null) {
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java?rev=1210832&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/ServiceDynamicLoadbalanceEndpointFactory.java Tue Dec 6 09:31:09 2011
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.config.xml.endpoints;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.config.xml.endpoints.utils.LoadbalanceAlgorithmFactory;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.ServiceDynamicLoadbalanceEndpoint;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import org.apache.synapse.endpoints.dispatch.Dispatcher;
+import org.apache.synapse.endpoints.dispatch.HttpSessionDispatcher;
+import org.apache.synapse.endpoints.dispatch.SoapSessionDispatcher;
+
+import javax.xml.namespace.QName;
+import java.net.URL;
+import java.util.*;
+
+/**
+ * Creates {@link org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint} using an XML configuration.
+ * <p/>
+ * <pre>
+ * <endpoint name="sdLB">
+ * <serviceDynamicLoadbalance algorithm="org.apache.synapse.endpoints.algorithms.RoundRobin"
+ * configuration="file:repository/conf/lbservices.xml"/>
+ * </endpoint>
+ * </pre>
+ * <p/>
+ * The configuration file has the following format. This can be even specified inline
+ * <loadBalancerConfig>
+ * <services>
+ * <service>
+ * <hosts>
+ * <host>test1.synapse.apache.org</host>
+ * <host>test1.apache.org</host>
+ * </hosts>
+ * <domain>test1.synapse.apache.domain</domain>
+ * </service>
+ * <service>
+ * <hosts>
+ * <host>test2.synapse.apache.org</host>
+ * <host>test2.apache.org</host>
+ * </hosts>
+ * <domain>test2.synapse.apache.domain</domain>
+ * </service>
+ * </services>
+ * </loadBalancerConfig>
+ */
+public class ServiceDynamicLoadbalanceEndpointFactory extends EndpointFactory {
+
+ private static ServiceDynamicLoadbalanceEndpointFactory instance =
+ new ServiceDynamicLoadbalanceEndpointFactory();
+ public static final QName SERVICES_QNAME = new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+ "services");
+ public static final QName LB_CONFIG_QNAME = new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+ "loadBalancerConfig");
+
+ private ServiceDynamicLoadbalanceEndpointFactory() {
+ }
+
+ public static ServiceDynamicLoadbalanceEndpointFactory getInstance() {
+ return instance;
+ }
+
+ protected Endpoint createEndpoint(OMElement epConfig, boolean anonymousEndpoint,
+ Properties properties) {
+
+ OMElement loadbalanceElement =
+ epConfig.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+ "serviceDynamicLoadbalance"));
+ if (loadbalanceElement == null) {
+ return null;
+ }
+
+ String configuration =
+ loadbalanceElement.getAttributeValue(new QName(XMLConfigConstants.NULL_NAMESPACE,
+ "configuration"));
+ OMElement servicesEle;
+ if (configuration != null) {
+ if (configuration.startsWith("$system:")) {
+ configuration = System.getProperty(configuration.substring("$system:".length()));
+ }
+ // Load the file
+ StAXOMBuilder builder = null;
+ try {
+ builder = new StAXOMBuilder(new URL(configuration).openStream());
+ } catch (Exception e) {
+ handleException("Could not load ServiceDynamicLoadbalanceEndpoint configuration file " +
+ configuration);
+ }
+ servicesEle = builder.getDocumentElement().getFirstChildWithName(SERVICES_QNAME);
+ } else {
+ OMElement lbConfigEle = loadbalanceElement.getFirstChildWithName(LB_CONFIG_QNAME);
+ if (lbConfigEle == null) {
+ throw new RuntimeException("loadBalancerConfig element not found as a child of " +
+ "serviceDynamicLoadbalance element");
+ }
+ servicesEle = lbConfigEle.getFirstChildWithName(SERVICES_QNAME);
+ }
+
+ if (servicesEle == null) {
+ throw new RuntimeException("services element not found in serviceDynamicLoadbalance configuration");
+ }
+ Map<String, String> hostDomainMap = new HashMap<String, String>();
+ for (Iterator<OMElement> iter = servicesEle.getChildrenWithLocalName("service"); iter.hasNext();) {
+ OMElement serviceEle = iter.next();
+ OMElement hostsEle =
+ serviceEle.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE, "hosts"));
+ if (hostsEle == null) {
+ throw new RuntimeException("hosts element not found as a child of service element");
+ }
+ List<String> hosts = new ArrayList<String>();
+ for (Iterator<OMElement> hostIter = hostsEle.getChildrenWithLocalName("host");
+ hostIter.hasNext();) {
+ OMElement hostEle = hostIter.next();
+ String host = hostEle.getText();
+ if (host.trim().length() == 0) {
+ throw new RuntimeException("host cannot be null");
+ }
+ hosts.add(host);
+ }
+ OMElement domainEle =
+ serviceEle.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE,
+ "domain"));
+ if (domainEle == null) {
+ throw new RuntimeException("domain element not found in as a child of services");
+ }
+ String domain = domainEle.getText();
+ if (domain.trim().length() == 0) {
+ throw new RuntimeException("domain cannot be null");
+ }
+ for (String host : hosts) {
+ if (hostDomainMap.containsKey(host)) {
+ throw new RuntimeException("host " + host + " has been already defined for " +
+ "clustering domain " + hostDomainMap.get(host));
+ }
+ hostDomainMap.put(host, domain);
+ }
+ }
+ if (hostDomainMap.isEmpty()) {
+ throw new RuntimeException("No service elements defined under services");
+ }
+
+ LoadbalanceAlgorithm algorithm =
+ LoadbalanceAlgorithmFactory.
+ createLoadbalanceAlgorithm(loadbalanceElement, null);
+
+ ServiceDynamicLoadbalanceEndpoint loadbalanceEndpoint =
+ new ServiceDynamicLoadbalanceEndpoint(hostDomainMap, algorithm);
+
+ // set endpoint name
+ OMAttribute name =
+ epConfig.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "name"));
+ if (name != null) {
+ loadbalanceEndpoint.setName(name.getAttributeValue());
+ }
+
+ // get the session for this endpoint
+ OMElement sessionElement =
+ epConfig.getFirstChildWithName(new QName(SynapseConstants.SYNAPSE_NAMESPACE, "session"));
+ if (sessionElement != null) {
+
+ OMElement sessionTimeout = sessionElement.getFirstChildWithName(
+ new QName(SynapseConstants.SYNAPSE_NAMESPACE, "sessionTimeout"));
+
+ if (sessionTimeout != null) {
+ try {
+ loadbalanceEndpoint.setSessionTimeout(Long.parseLong(
+ sessionTimeout.getText().trim()));
+ } catch (NumberFormatException nfe) {
+ handleException("Invalid session timeout value : " + sessionTimeout.getText());
+ }
+ }
+
+ String type = sessionElement.getAttributeValue(new QName("type"));
+
+ if (type.equalsIgnoreCase("soap")) {
+ Dispatcher soapDispatcher = new SoapSessionDispatcher();
+ loadbalanceEndpoint.setDispatcher(soapDispatcher);
+
+ } else if (type.equalsIgnoreCase("http")) {
+ Dispatcher httpDispatcher = new HttpSessionDispatcher();
+ loadbalanceEndpoint.setDispatcher(httpDispatcher);
+
+ }
+
+ loadbalanceEndpoint.setSessionAffinity(true);
+ }
+ loadbalanceEndpoint.setFailover(false);
+
+ return loadbalanceEndpoint;
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java?rev=1210832&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/core/axis2/ServiceLoadBalanceMembershipHandler.java Tue Dec 6 09:31:09 2011
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.core.axis2;
+
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.LoadBalanceMembershipHandler;
+import org.apache.synapse.endpoints.DynamicLoadbalanceFaultHandler;
+import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Bridge between Axis2 membership notification and Synapse load balancing
+ */
+public class ServiceLoadBalanceMembershipHandler implements LoadBalanceMembershipHandler {
+ private static final Log log = LogFactory.getLog(ServiceLoadBalanceMembershipHandler.class);
+
+ private ConfigurationContext configCtx;
+
+ /**
+ * Key - Host, Value - DomainAlgorithmContext
+ */
+ private Map<String, DomainAlgorithmContext> hostDomainAlgorithmContextMap =
+ new HashMap<String, DomainAlgorithmContext>();
+ private ClusteringAgent clusteringAgent;
+
+ public ServiceLoadBalanceMembershipHandler(Map<String, String> hostDomainMap,
+ LoadbalanceAlgorithm algorithm,
+ ConfigurationContext configCtx,
+ boolean isClusteringEnabled,
+ String endpointName) {
+ for (Map.Entry<String, String> entry : hostDomainMap.entrySet()) {
+ AlgorithmContext algorithmContext =
+ new AlgorithmContext(isClusteringEnabled, configCtx, endpointName + "." + entry.getKey());
+ this.hostDomainAlgorithmContextMap.put(entry.getKey(),
+ new DomainAlgorithmContext(entry.getValue(), algorithm.clone(), algorithmContext));
+ }
+ }
+
+ public void init(Properties props, LoadbalanceAlgorithm algorithm) {
+ // Nothing to do
+ }
+
+ public void setConfigurationContext(ConfigurationContext configCtx) {
+ this.configCtx = configCtx;
+
+ // The following code does the bridging between Axis2 and Synapse load balancing
+ clusteringAgent = configCtx.getAxisConfiguration().getClusteringAgent();
+ if(clusteringAgent == null){
+ String msg = "In order to enable load balancing across an Axis2 cluster, " +
+ "the cluster entry should be enabled in the axis2.xml file";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+
+ public ConfigurationContext getConfigurationContext(){
+ return configCtx;
+ }
+
+ /**
+ * Getting the next member to which the request has to be sent in a round-robin fashion
+ *
+ * @param context The AlgorithmContext
+ * @return The current member
+ * @deprecated Use {@link #getNextApplicationMember(String)}
+ */
+ public Member getNextApplicationMember(AlgorithmContext context) {
+ throw new UnsupportedOperationException("This operation is invalid. " +
+ "Call getNextApplicationMember(String host)");
+ }
+
+ public Member getNextApplicationMember(String host) {
+ DomainAlgorithmContext domainAlgorithmContext = hostDomainAlgorithmContextMap.get(host);
+ if(domainAlgorithmContext == null) {
+ throw new SynapseException("Domain not found for host" + host);
+ }
+ String lbDomain = domainAlgorithmContext.getDomain();
+ LoadbalanceAlgorithm algorithm = domainAlgorithmContext.getAlgorithm();
+ GroupManagementAgent groupMgtAgent = clusteringAgent.getGroupManagementAgent(lbDomain);
+ if(groupMgtAgent == null){
+ String msg =
+ "A LoadBalanceEventHandler has not been specified in the axis2.xml " +
+ "file for the domain " + lbDomain + " for host " + host;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ algorithm.setApplicationMembers(groupMgtAgent.getMembers());
+ AlgorithmContext context = domainAlgorithmContext.getAlgorithmContext();
+ return algorithm.getNextApplicationMember(context);
+ }
+
+ public LoadbalanceAlgorithm getLoadbalanceAlgorithm() {
+ return null;
+ }
+
+ public Properties getProperties() {
+ return null;
+ }
+
+ /**
+ * POJO for maintaining the domain & AlgorithmContext for a particular host
+ */
+ private static class DomainAlgorithmContext {
+ // The clustering domain
+ private String domain;
+ private AlgorithmContext algorithmContext;
+ private LoadbalanceAlgorithm algorithm;
+
+ private DomainAlgorithmContext(String domain, LoadbalanceAlgorithm algorithm,
+ AlgorithmContext algorithmContext) {
+ this.domain = domain;
+ this.algorithm = algorithm;
+ this.algorithmContext = algorithmContext;
+ }
+
+ public LoadbalanceAlgorithm getAlgorithm() {
+ return algorithm;
+ }
+
+ public String getDomain() {
+ return domain;
+ }
+
+ public AlgorithmContext getAlgorithmContext() {
+ return algorithmContext;
+ }
+ }
+}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java?rev=1210832&r1=1210831&r2=1210832&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java Tue Dec 6 09:31:09 2011
@@ -40,7 +40,13 @@ import org.apache.synapse.transport.nhtt
import java.net.MalformedURLException;
import java.net.URL;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
/**
* Represents a dynamic load balance endpoint. The application membership is not static,
@@ -106,7 +112,6 @@ public class DynamicLoadbalanceEndpoint
public void send(MessageContext synCtx) {
SessionInformation sessionInformation = null;
Member currentMember = null;
- //TODO Temp hack: ESB removes the session id from request in a random manner.
setCookieHeader(synCtx);
ConfigurationContext configCtx =
@@ -145,8 +150,6 @@ public class DynamicLoadbalanceEndpoint
}
}
-
- setupTransportHeaders(synCtx);
DynamicLoadbalanceFaultHandlerImpl faultHandler = new DynamicLoadbalanceFaultHandlerImpl();
if (sessionInformation != null && currentMember != null) {
//send message on current session
@@ -209,37 +212,6 @@ public class DynamicLoadbalanceEndpoint
return null;
}
- /**
- * Adds the X-Forwarded-For header to the outgoing message.
- *
- * @param synCtx Current message context
- */
- protected void setupTransportHeaders(MessageContext synCtx) {
- Axis2MessageContext axis2smc = (Axis2MessageContext) synCtx;
- org.apache.axis2.context.MessageContext axis2MessageCtx =
- axis2smc.getAxis2MessageContext();
- Object headers = axis2MessageCtx.getProperty(
- org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
- if (headers != null && headers instanceof Map ) {
- Map headersMap = (Map) headers;
- String xForwardFor = (String) headersMap.get(NhttpConstants.HEADER_X_FORWARDED_FOR);
- String remoteHost = (String) axis2MessageCtx.getProperty(
- org.apache.axis2.context.MessageContext.REMOTE_ADDR);
-
- if (xForwardFor != null && !"".equals(xForwardFor)) {
- StringBuilder xForwardedForString = new StringBuilder();
- xForwardedForString.append(xForwardFor);
- if (remoteHost != null && !"".equals(remoteHost)) {
- xForwardedForString.append(",").append(remoteHost);
- }
- headersMap.put(NhttpConstants.HEADER_X_FORWARDED_FOR, xForwardedForString.toString());
- } else {
- headersMap.put(NhttpConstants.HEADER_X_FORWARDED_FOR,remoteHost);
- }
-
- }
- }
-
public void setName(String name) {
super.setName(name);
// algorithmContext.setContextID(name);
@@ -310,8 +282,7 @@ public class DynamicLoadbalanceEndpoint
}
Map<String, String> memberHosts;
- if ((memberHosts = (Map<String, String>) currentMember.getProperties().get(
- HttpSessionDispatcher.HOSTS)) == null) {
+ if ((memberHosts = (Map<String, String>) currentMember.getProperties().get(HttpSessionDispatcher.HOSTS)) == null) {
currentMember.getProperties().put(HttpSessionDispatcher.HOSTS,
memberHosts = new HashMap<String, String>());
}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java?rev=1210832&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/ServiceDynamicLoadbalanceEndpoint.java Tue Dec 6 09:31:09 2011
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.synapse.endpoints;
+
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.management.DefaultGroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.protocol.HTTP;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.LoadBalanceMembershipHandler;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.core.axis2.ServiceLoadBalanceMembershipHandler;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
+import org.apache.synapse.transport.nhttp.NhttpConstants;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Represents a dynamic load balance endpoint. The application membership is not static,
+ * but discovered through some mechanism such as using a GCF
+ */
+public class ServiceDynamicLoadbalanceEndpoint extends DynamicLoadbalanceEndpoint {
+
+ private static final Log log = LogFactory.getLog(ServiceDynamicLoadbalanceEndpoint.class);
+
+ /**
+ * Axis2 based membership handler which handles members in multiple clustering domains
+ */
+ private ServiceLoadBalanceMembershipHandler slbMembershipHandler;
+
+ /**
+ * Key - host, Value - domain
+ */
+ private Map<String, String> hostDomainMap;
+
+ @Override
+ public void init(SynapseEnvironment synapseEnvironment) {
+ if (!initialized) {
+ super.init(synapseEnvironment);
+ ConfigurationContext cfgCtx =
+ ((Axis2SynapseEnvironment) synapseEnvironment).getAxis2ConfigurationContext();
+ ClusteringAgent clusteringAgent = cfgCtx.getAxisConfiguration().getClusteringAgent();
+ if (clusteringAgent == null) {
+ throw new SynapseException("Axis2 ClusteringAgent not defined in axis2.xml");
+ }
+ // Add the Axis2 GroupManagement agents
+ for (String domain : hostDomainMap.values()) {
+ if (clusteringAgent.getGroupManagementAgent(domain) == null) {
+ clusteringAgent.addGroupManagementAgent(new DefaultGroupManagementAgent(), domain);
+ }
+ }
+ slbMembershipHandler = new ServiceLoadBalanceMembershipHandler(hostDomainMap,
+ getAlgorithm(),
+ cfgCtx,
+ isClusteringEnabled,
+ getName());
+
+ // Initialize the SAL Sessions if already has not been initialized.
+ SALSessions salSessions = SALSessions.getInstance();
+ if (!salSessions.isInitialized()) {
+ salSessions.initialize(isClusteringEnabled, cfgCtx);
+ }
+ initialized = true;
+ log.info("ServiceDynamicLoadbalanceEndpoint initialized");
+ }
+ }
+
+ public ServiceDynamicLoadbalanceEndpoint(Map<String, String> hostDomainMap,
+ LoadbalanceAlgorithm algorithm) {
+
+ this.hostDomainMap = hostDomainMap;
+ setAlgorithm(algorithm);
+ }
+
+ public LoadBalanceMembershipHandler getLbMembershipHandler() {
+ return slbMembershipHandler;
+ }
+
+ public Map<String, String> getHostDomainMap() {
+ return Collections.unmodifiableMap(hostDomainMap);
+ }
+
+ public void send(MessageContext synCtx) {
+ setCookieHeader(synCtx);
+ //TODO: Refactor Session Aware LB dispatching code
+
+ // Check whether a valid session for session aware dispatching is available
+ Member currentMember = null;
+ SessionInformation sessionInformation = null;
+ if (isSessionAffinityBasedLB()) {
+ // first check if this session is associated with a session. if so, get the endpoint
+ // associated for that session.
+ sessionInformation =
+ (SessionInformation) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+
+ currentMember = (Member) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+
+ if (sessionInformation == null && currentMember == null) {
+ sessionInformation = dispatcher.getSession(synCtx);
+ if (sessionInformation != null) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Current session id : " + sessionInformation.getId());
+ }
+
+ currentMember = sessionInformation.getMember();
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER, currentMember);
+ // This is for reliably recovery any session information if while response is getting ,
+ // session information has been removed by cleaner.
+ // This will not be a cost as session information a not heavy data structure
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
+ }
+ }
+
+ }
+
+ // Dispatch request the relevant member
+ String targetHost = getTargetHost(synCtx);
+ ConfigurationContext configCtx =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext().getConfigurationContext();
+ if (slbMembershipHandler.getConfigurationContext() == null) {
+ slbMembershipHandler.setConfigurationContext(configCtx);
+ }
+ ServiceDynamicLoadbalanceFaultHandlerImpl faultHandler = new ServiceDynamicLoadbalanceFaultHandlerImpl();
+ faultHandler.setHost(targetHost);
+ if (sessionInformation != null && currentMember != null) {
+ //send message on current session
+ sessionInformation.updateExpiryTime();
+ sendToApplicationMember(synCtx, currentMember, faultHandler, false);
+ } else {
+ // prepare for a new session
+ currentMember = slbMembershipHandler.getNextApplicationMember(targetHost);
+ if (currentMember == null) {
+ String msg = "No application members available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ sendToApplicationMember(synCtx, currentMember, faultHandler, true);
+ }
+ }
+
+ private String getTargetHost(MessageContext synCtx) {
+ org.apache.axis2.context.MessageContext axis2MessageContext =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+ Map<String, String> headers =
+ (Map<String, String>) axis2MessageContext.
+ getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
+ String address = headers.get(HTTP.TARGET_HOST);
+ synCtx.setProperty("LB_REQUEST_HOST", address); // Need to set with the port
+ if (address.contains(":")) {
+ address = address.substring(0, address.indexOf(":"));
+ }
+ return address;
+ }
+
+ /**
+ * This FaultHandler will try to resend the message to another member if an error occurs
+ * while sending to some member. This is a failover mechanism
+ */
+ private class ServiceDynamicLoadbalanceFaultHandlerImpl extends DynamicLoadbalanceFaultHandler {
+
+ private EndpointReference to;
+ private Member currentMember;
+ private Endpoint currentEp;
+ private String host;
+
+ private static final int MAX_RETRY_COUNT = 5;
+
+ // ThreadLocal variable to keep track of how many times this fault handler has been
+ // called
+ private ThreadLocal<Integer> callCount = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return 0;
+ }
+ };
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public void setCurrentMember(Member currentMember) {
+ this.currentMember = currentMember;
+ }
+
+ public void setTo(EndpointReference to) {
+ this.to = to;
+ }
+
+ private ServiceDynamicLoadbalanceFaultHandlerImpl() {
+ }
+
+ public void onFault(MessageContext synCtx) {
+ if (currentMember == null) {
+ return;
+ }
+ currentMember.suspend(10000); // TODO: Make this configurable.
+ log.info("Suspended member " + currentMember + " for 10s");
+
+ // Prevent infinite retrying to failed members
+ callCount.set(callCount.get() + 1);
+ if (callCount.get() >= MAX_RETRY_COUNT) {
+ return;
+ }
+
+ //cleanup endpoint if exists
+ if (currentEp != null) {
+ currentEp.destroy();
+ }
+ Integer errorCode = (Integer) synCtx.getProperty(SynapseConstants.ERROR_CODE);
+ if (errorCode != null) {
+ if (errorCode.equals(NhttpConstants.CONNECTION_FAILED) ||
+ errorCode.equals(NhttpConstants.CONNECT_CANCEL) ||
+ errorCode.equals(NhttpConstants.CONNECT_TIMEOUT)) {
+ // Try to resend to another member
+ Member newMember = slbMembershipHandler.getNextApplicationMember(host);
+ if (newMember == null) {
+ String msg = "No application members available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ log.info("Failed over to " + newMember);
+ synCtx.setTo(to);
+ if (isSessionAffinityBasedLB()) {
+ //We are sending the this message on a new session,
+ // hence we need to remove previous session information
+ Set pros = synCtx.getPropertyKeySet();
+ if (pros != null) {
+ pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+ }
+ }
+ try {
+ Thread.sleep(1000); // Sleep for sometime before retrying
+ } catch (InterruptedException ignored) {
+ }
+ sendToApplicationMember(synCtx, newMember, this, true);
+ } else if (errorCode.equals(NhttpConstants.SND_IO_ERROR_SENDING) ||
+ errorCode.equals(NhttpConstants.CONNECTION_CLOSED)) {
+ // TODO: Envelope is consumed
+ }
+ }
+ // We cannot failover since we are using binary relay
+ }
+
+ public void setCurrentEp(Endpoint currentEp) {
+ this.currentEp = currentEp;
+ }
+ }
+}
Copied: synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml (from r1210743, synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml)
URL: http://svn.apache.org/viewvc/synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml?p2=synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml&p1=synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml&r1=1210743&r2=1210832&rev=1210832&view=diff
==============================================================================
--- synapse/trunk/java/repository/conf/sample/synapse_sample_57.xml (original)
+++ synapse/trunk/java/repository/conf/sample/synapse_sample_60.xml Tue Dec 6 09:31:09 2011
@@ -18,20 +18,33 @@
~ under the License.
-->
-<!-- Session less load balancing between 3 endpoints -->
+<!-- Load balancing between 2 Cloud Services -->
<definitions xmlns="http://ws.apache.org/ns/synapse">
<sequence name="main" onError="errorHandler">
<in>
+ <property name="SERVICE_PREFIX" expression="$axis2:SERVICE_PREFIX"/>
<send>
- <endpoint name="dynamicLB">
- <dynamicLoadbalance failover="true"
- algorithm="org.apache.synapse.endpoints.algorithms.RoundRobin">
- <membershipHandler
- class="org.apache.synapse.core.axis2.Axis2LoadBalanceMembershipHandler">
- <property name="applicationDomain" value="apache.axis2.app.domain"/>
- </membershipHandler>
- </dynamicLoadbalance>
+ <endpoint name="sdlbEndpoint">
+ <serviceDynamicLoadbalance failover="true"
+ algorithm="org.apache.synapse.endpoints.algorithms.RoundRobin">
+ <loadBalancerConfig>
+ <services>
+ <service>
+ <hosts>
+ <host>test1.synapse.apache.org</host>
+ </hosts>
+ <domain>test1.synapse.domain</domain>
+ </service>
+ <service>
+ <hosts>
+ <host>test2.synapse.apache.org</host>
+ </hosts>
+ <domain>test2.synapse.domain</domain>
+ </service>
+ </services>
+ </loadBalancerConfig>
+ </serviceDynamicLoadbalance>
</endpoint>
</send>
<drop/>