You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2010/02/23 09:20:06 UTC
svn commit: r915237 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse:
config/xml/endpoints/LoadbalanceEndpointFactory.java
config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java
endpoints/LoadbalanceEndpoint.java
Author: indika
Date: Tue Feb 23 08:20:06 2010
New Revision: 915237
URL: http://svn.apache.org/viewvc?rev=915237&view=rev
Log:
Adding back the member based load balancing which was removed a few months back.
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java?rev=915237&r1=915236&r2=915237&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java Tue Feb 23 08:20:06 2010
@@ -85,7 +85,6 @@
getChildrenWithName((MEMBER)).hasNext()){
String msg =
"Invalid Synapse configuration. " +
- "loadbalanceEndpoint element cannot have both member & endpoint " +
"child elements";
log.error(msg);
throw new SynapseException(msg);
@@ -106,16 +105,17 @@
log.error(msg);
throw new SynapseException(msg);
}
-// TODO FIX-RUWAN
-// List<Member> members = getMembers(loadbalanceElement);
-// loadbalanceEndpoint.setMembers(members);
-// algorithm =
-// LoadbalanceAlgorithmFactory.
-// createLoadbalanceAlgorithm2(loadbalanceElement, members);
-// loadbalanceEndpoint.startApplicationMembershipTimer();
+
+ List<Member> members = getMembers(loadbalanceElement);
+ loadbalanceEndpoint.setMembers(members);
+ algorithm =
+ LoadbalanceAlgorithmFactory.
+ createLoadbalanceAlgorithm2(loadbalanceElement, members);
+ loadbalanceEndpoint.startApplicationMembershipTimer();
}
- if (loadbalanceEndpoint.getChildren() == null) {
+ if (loadbalanceEndpoint.getChildren() == null &&
+ loadbalanceEndpoint.getMembers() == null) {
String msg = "Invalid Synapse configuration.\n"
+ "A LoadbalanceEndpoint must have child elements, but the LoadbalanceEndpoint "
+ "'" + loadbalanceEndpoint.getName() + "' does not have any child elements.";
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java?rev=915237&r1=915236&r2=915237&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java Tue Feb 23 08:20:06 2010
@@ -27,6 +27,7 @@
import org.apache.synapse.config.xml.XMLConfigConstants;
import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
import org.apache.synapse.endpoints.algorithms.RoundRobin;
+import org.apache.axis2.clustering.Member;
import javax.xml.namespace.QName;
import java.util.List;
@@ -81,4 +82,11 @@
return algorithm;
}
+
+ public static LoadbalanceAlgorithm createLoadbalanceAlgorithm2(OMElement loadbalanceElement,
+ List<Member> members) {
+ LoadbalanceAlgorithm algorithm = createLoadbalanceAlgorithm(loadbalanceElement, null);
+ algorithm.setApplicationMembers(members);
+ return algorithm;
+ }
}
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?rev=915237&r1=915236&r2=915237&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java Tue Feb 23 08:20:06 2010
@@ -20,14 +20,25 @@
package org.apache.synapse.endpoints;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.Member;
import org.apache.synapse.FaultHandler;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import java.net.*;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.Timer;
+import java.util.ArrayList;
+import java.io.IOException;
+
/**
* A Load balance endpoint contains multiple child endpoints. It routes messages according to the
* specified load balancing algorithm. This will assume that all immediate child endpoints are
@@ -46,6 +57,17 @@
/** The algorithm context to hold runtime state related to the load balance algorithm */
private AlgorithmContext algorithmContext = null;
+ /**
+ * List of currently available application members amongst which the load is distributed
+ */
+ private List<Member> activeMembers = null;
+
+ /**
+ * List of currently unavailable members
+ */
+ private List<Member> inactiveMembers = null;
+
+
@Override
public void init(SynapseEnvironment synapseEnvironment) {
ConfigurationContext cc =
@@ -64,7 +86,10 @@
log.debug("Sending using Load-balance " + toString());
}
- Endpoint endpoint = getNextChild(synCtx);
+ Endpoint endpoint = null;
+ if (activeMembers == null) {
+ endpoint = getNextChild(synCtx);
+ }
if (endpoint != null) {
// if this is not a retry
@@ -83,6 +108,13 @@
synCtx.pushFaultHandler(this);
endpoint.send(synCtx);
+ } else if (activeMembers != null && !activeMembers.isEmpty()) {
+ EndpointReference to = synCtx.getTo();
+ LoadbalanceFaultHandler faultHandler = new LoadbalanceFaultHandler(to);
+ if (failover) {
+ synCtx.pushFaultHandler(faultHandler);
+ }
+ sendToApplicationMember(synCtx, to, faultHandler);
} else {
// if this is not a retry
informFailure(synCtx, SynapseConstants.ENDPOINT_LB_NONE_READY,
@@ -90,6 +122,54 @@
}
}
+ private void sendToApplicationMember(MessageContext synCtx,
+ EndpointReference to,
+ LoadbalanceFaultHandler faultHandler) {
+ org.apache.axis2.context.MessageContext axis2MsgCtx =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+ String transport = axis2MsgCtx.getTransportIn().getName();
+ algorithm.setApplicationMembers(activeMembers);
+ Member currentMember = algorithm.getNextApplicationMember(algorithmContext);
+ faultHandler.setCurrentMember(currentMember);
+
+ if (currentMember != null) {
+
+ // URL rewrite
+ if (transport.equals("http") || transport.equals("https")) {
+ String address = to.getAddress();
+ if (address.indexOf(":") != -1) {
+ try {
+ address = new URL(address).getPath();
+ } catch (MalformedURLException e) {
+ String msg = "URL " + address + " is malformed";
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+ }
+ }
+ EndpointReference epr =
+ new EndpointReference(transport + "://" + currentMember.getHostName() +
+ ":" + currentMember.getHttpPort() + address);
+ synCtx.setTo(epr);
+ if (failover) {
+ synCtx.getEnvelope().build();
+ }
+
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition definition = new EndpointDefinition();
+ endpoint.setDefinition(definition);
+ endpoint.send(synCtx);
+ } else {
+ log.error("Cannot load balance for non-HTTP/S transport " + transport);
+ }
+ } else {
+ synCtx.getFaultStack().pop(); // Remove the LoadbalanceFaultHandler
+ String msg = "No application members available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+
/**
* If this endpoint is in inactive state, checks if all immediate child endpoints are still
* failed. If so returns false. If at least one child endpoint is in active state, sets this
@@ -157,4 +237,109 @@
protected Endpoint getNextChild(MessageContext synCtx) {
return algorithm.getNextEndpoint(synCtx, algorithmContext);
}
+
+ /**
+ * This FaultHandler will try to resend the message to another member if an error occurs
+ * while sending to some member. This is a failover mechanism
+ */
+ private class LoadbalanceFaultHandler extends FaultHandler {
+
+ private EndpointReference to;
+ private Member currentMember;
+
+ public void setCurrentMember(Member currentMember) {
+ this.currentMember = currentMember;
+ }
+
+ private LoadbalanceFaultHandler(EndpointReference to) {
+ this.to = to;
+ }
+
+ public void onFault(MessageContext synCtx) {
+ if (currentMember == null) {
+ return;
+ }
+ synCtx.pushFaultHandler(this);
+ activeMembers.remove(currentMember); // This member has to be inactivated
+ inactiveMembers.add(currentMember);
+ sendToApplicationMember(synCtx, to, this);
+ }
+ }
+
+ public void setMembers(List<Member> members) {
+ this.activeMembers = members;
+ this.inactiveMembers = new ArrayList<Member>();
+ }
+
+ public List<Member> getMembers(){
+ return this.activeMembers;
+ }
+
+ public void startApplicationMembershipTimer(){
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new MemberActivatorTask(), 1000, 500);
+ }
+
+ /**
+ * The task which checks whther inactive members have become available again
+ */
+ private class MemberActivatorTask extends TimerTask {
+
+ public void run() {
+ try {
+ for(Member member: inactiveMembers){
+ if(canConnect(member)){
+ inactiveMembers.remove(member);
+ activeMembers.add(member);
+ }
+ }
+ } catch (Exception ignored) {
+ // Ignore all exceptions. The timer should continue to run
+ }
+ }
+
+ /**
+ * Before activating a member, we will try to verify whether we can connect to it
+ *
+ * @param member The member whose connectvity needs to be verified
+ * @return true, if the member can be contacted; false, otherwise.
+ */
+ private boolean canConnect(Member member) {
+ if(log.isDebugEnabled()){
+ log.debug("Trying to connect to member " + member.getHostName() + "...");
+ }
+ for (int retries = 30; retries > 0; retries--) {
+ try {
+ InetAddress addr = InetAddress.getByName(member.getHostName());
+ int httpPort = member.getHttpPort();
+ if(log.isDebugEnabled()){
+ log.debug("HTTP Port=" + httpPort);
+ }
+ if (httpPort != -1) {
+ SocketAddress httpSockaddr = new InetSocketAddress(addr, httpPort);
+ new Socket().connect(httpSockaddr, 10000);
+ }
+ int httpsPort = member.getHttpsPort();
+ if(log.isDebugEnabled()){
+ log.debug("HTTPS Port=" + httpPort);
+ }
+ if (httpsPort != -1) {
+ SocketAddress httpsSockaddr = new InetSocketAddress(addr, httpsPort);
+ new Socket().connect(httpsSockaddr, 10000);
+ }
+ return true;
+ } catch (IOException e) {
+ if(log.isDebugEnabled()){
+ log.debug("", e);
+ }
+ String msg = e.getMessage();
+ if (msg.indexOf("Connection refused") == -1 &&
+ msg.indexOf("connect timed out") == -1) {
+ log.error("Cannot connect to member " + member, e);
+ }
+ }
+ }
+ return false;
+ }
+ }
}