You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by in...@apache.org on 2010/02/23 09:20:06 UTC

svn commit: r915237 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse: config/xml/endpoints/LoadbalanceEndpointFactory.java config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java endpoints/LoadbalanceEndpoint.java

Author: indika
Date: Tue Feb 23 08:20:06 2010
New Revision: 915237

URL: http://svn.apache.org/viewvc?rev=915237&view=rev
Log:
Adding back the member based load balancing which was removed a few months back.


Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java?rev=915237&r1=915236&r2=915237&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java Tue Feb 23 08:20:06 2010
@@ -85,7 +85,6 @@
                         getChildrenWithName((MEMBER)).hasNext()){
                     String msg =
                             "Invalid Synapse configuration. " +
-                            "loadbalanceEndpoint element cannot have both member & endpoint " +
                             "child elements";
                     log.error(msg);
                     throw new SynapseException(msg);
@@ -106,16 +105,17 @@
                     log.error(msg);
                     throw new SynapseException(msg);
                 }
-//                TODO FIX-RUWAN
-//                List<Member> members = getMembers(loadbalanceElement);
-//                loadbalanceEndpoint.setMembers(members);
-//                algorithm =
-//                        LoadbalanceAlgorithmFactory.
-//                                createLoadbalanceAlgorithm2(loadbalanceElement, members);
-//                loadbalanceEndpoint.startApplicationMembershipTimer();
+
+                List<Member> members = getMembers(loadbalanceElement);
+                loadbalanceEndpoint.setMembers(members);
+                algorithm =
+                        LoadbalanceAlgorithmFactory.
+                                createLoadbalanceAlgorithm2(loadbalanceElement, members);
+                loadbalanceEndpoint.startApplicationMembershipTimer();
             }
 
-            if (loadbalanceEndpoint.getChildren() == null) {
+            if (loadbalanceEndpoint.getChildren() == null &&
+                    loadbalanceEndpoint.getMembers() == null) {
                 String msg = "Invalid Synapse configuration.\n"
                     + "A LoadbalanceEndpoint must have child elements, but the LoadbalanceEndpoint "
                     + "'" + loadbalanceEndpoint.getName() + "' does not have any child elements.";

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java?rev=915237&r1=915236&r2=915237&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/utils/LoadbalanceAlgorithmFactory.java Tue Feb 23 08:20:06 2010
@@ -27,6 +27,7 @@
 import org.apache.synapse.config.xml.XMLConfigConstants;
 import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
 import org.apache.synapse.endpoints.algorithms.RoundRobin;
+import org.apache.axis2.clustering.Member;
 
 import javax.xml.namespace.QName;
 import java.util.List;
@@ -81,4 +82,11 @@
         
         return algorithm;
     }
+
+    public static LoadbalanceAlgorithm createLoadbalanceAlgorithm2(OMElement loadbalanceElement,
+                                                                   List<Member> members) {
+        LoadbalanceAlgorithm algorithm = createLoadbalanceAlgorithm(loadbalanceElement, null);
+        algorithm.setApplicationMembers(members);
+        return algorithm;
+    }
 }

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?rev=915237&r1=915236&r2=915237&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java Tue Feb 23 08:20:06 2010
@@ -20,14 +20,25 @@
 package org.apache.synapse.endpoints;
 
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.Member;
 import org.apache.synapse.FaultHandler;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
 import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.core.SynapseEnvironment;
 import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
 import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
 
+import java.net.*;
+import java.util.List;
+import java.util.TimerTask;
+import java.util.Timer;
+import java.util.ArrayList;
+import java.io.IOException;
+
 /**
  * A Load balance endpoint contains multiple child endpoints. It routes messages according to the
  * specified load balancing algorithm. This will assume that all immediate child endpoints are
@@ -46,6 +57,17 @@
     /** The algorithm context to hold runtime state related to the load balance algorithm */
     private AlgorithmContext algorithmContext = null;
 
+    /**
+     * List of currently available application members amongst which the load is distributed
+     */
+    private List<Member> activeMembers = null;
+
+    /**
+     * List of currently unavailable members
+     */
+    private List<Member> inactiveMembers = null;
+
+
     @Override
     public void init(SynapseEnvironment synapseEnvironment) {
         ConfigurationContext cc =
@@ -64,7 +86,10 @@
             log.debug("Sending using Load-balance " + toString());
         }
 
-        Endpoint endpoint = getNextChild(synCtx); 
+        Endpoint endpoint = null;
+        if (activeMembers == null) {
+            endpoint = getNextChild(synCtx);
+        }
 
         if (endpoint != null) {
             // if this is not a retry
@@ -83,6 +108,13 @@
             synCtx.pushFaultHandler(this);
             endpoint.send(synCtx);
 
+        } else if (activeMembers != null && !activeMembers.isEmpty()) {
+            EndpointReference to = synCtx.getTo();
+            LoadbalanceFaultHandler faultHandler = new LoadbalanceFaultHandler(to);
+            if (failover) {
+                synCtx.pushFaultHandler(faultHandler);
+            }
+            sendToApplicationMember(synCtx, to, faultHandler);
         } else {
             // if this is not a retry
             informFailure(synCtx, SynapseConstants.ENDPOINT_LB_NONE_READY,
@@ -90,6 +122,54 @@
         }
     }
 
+    private void sendToApplicationMember(MessageContext synCtx,
+                                         EndpointReference to,
+                                         LoadbalanceFaultHandler faultHandler) {
+        org.apache.axis2.context.MessageContext axis2MsgCtx =
+                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+
+        String transport = axis2MsgCtx.getTransportIn().getName();
+        algorithm.setApplicationMembers(activeMembers);
+        Member currentMember = algorithm.getNextApplicationMember(algorithmContext);
+        faultHandler.setCurrentMember(currentMember);
+
+        if (currentMember != null) {
+
+            // URL rewrite
+            if (transport.equals("http") || transport.equals("https")) {
+                String address = to.getAddress();
+                if (address.indexOf(":") != -1) {
+                    try {
+                        address = new URL(address).getPath();
+                    } catch (MalformedURLException e) {
+                        String msg = "URL " + address + " is malformed";
+                        log.error(msg, e);
+                        throw new SynapseException(msg, e);
+                    }
+                }
+                EndpointReference epr =
+                        new EndpointReference(transport + "://" + currentMember.getHostName() +
+                                              ":" + currentMember.getHttpPort() + address);
+                synCtx.setTo(epr);
+                if (failover) {
+                    synCtx.getEnvelope().build();
+                }
+
+                AddressEndpoint endpoint = new AddressEndpoint();
+                EndpointDefinition definition = new EndpointDefinition();
+                endpoint.setDefinition(definition);
+                endpoint.send(synCtx);
+            } else {
+                log.error("Cannot load balance for non-HTTP/S transport " + transport);
+            }
+        } else {
+            synCtx.getFaultStack().pop(); // Remove the LoadbalanceFaultHandler
+            String msg = "No application members available";
+            log.error(msg);
+            throw new SynapseException(msg);
+        }
+    }
+
     /**
      * If this endpoint is in inactive state, checks if all immediate child endpoints are still
      * failed. If so returns false. If at least one child endpoint is in active state, sets this
@@ -157,4 +237,109 @@
     protected Endpoint getNextChild(MessageContext synCtx) {
         return algorithm.getNextEndpoint(synCtx, algorithmContext);
     }
+    
+    /**
+     * This FaultHandler will try to resend the message to another member if an error occurs
+     * while sending to some member. This is a failover mechanism
+     */
+    private class LoadbalanceFaultHandler extends FaultHandler {
+
+        private EndpointReference to;
+        private Member currentMember;
+
+        public void setCurrentMember(Member currentMember) {
+            this.currentMember = currentMember;
+        }
+
+        private LoadbalanceFaultHandler(EndpointReference to) {
+            this.to = to;
+        }
+
+        public void onFault(MessageContext synCtx) {
+            if (currentMember == null) {
+                return;
+            }
+            synCtx.pushFaultHandler(this);
+            activeMembers.remove(currentMember); // This member has to be inactivated
+            inactiveMembers.add(currentMember);
+            sendToApplicationMember(synCtx, to, this);
+        }
+    }
+
+    public void setMembers(List<Member> members) {
+        this.activeMembers = members;
+        this.inactiveMembers = new ArrayList<Member>();
+    }
+
+    public List<Member> getMembers(){
+        return this.activeMembers;
+    }
+
+    public void startApplicationMembershipTimer(){
+        Timer timer = new Timer();
+        timer.scheduleAtFixedRate(new MemberActivatorTask(), 1000, 500);
+    }
+
+    /**
+     * The task which checks whther inactive members have become available again
+     */
+    private class MemberActivatorTask extends TimerTask {
+
+        public void run() {
+            try {
+                for(Member member: inactiveMembers){
+                    if(canConnect(member)){
+                        inactiveMembers.remove(member);
+                        activeMembers.add(member);
+                    }
+                }
+            } catch (Exception ignored) {
+                // Ignore all exceptions. The timer should continue to run
+            }
+        }
+
+        /**
+         * Before activating a member, we will try to verify whether we can connect to it
+         *
+         * @param member The member whose connectvity needs to be verified
+         * @return true, if the member can be contacted; false, otherwise.
+         */
+        private boolean canConnect(Member member) {
+            if(log.isDebugEnabled()){
+                log.debug("Trying to connect to member " + member.getHostName() + "...");
+            }
+            for (int retries = 30; retries > 0; retries--) {
+                try {
+                    InetAddress addr = InetAddress.getByName(member.getHostName());
+                    int httpPort = member.getHttpPort();
+                    if(log.isDebugEnabled()){
+                        log.debug("HTTP Port=" + httpPort);
+                    }
+                    if (httpPort != -1) {
+                        SocketAddress httpSockaddr = new InetSocketAddress(addr, httpPort);
+                        new Socket().connect(httpSockaddr, 10000);
+                    }
+                    int httpsPort = member.getHttpsPort();
+                    if(log.isDebugEnabled()){
+                        log.debug("HTTPS Port=" + httpPort);
+                    }
+                    if (httpsPort != -1) {
+                        SocketAddress httpsSockaddr = new InetSocketAddress(addr, httpsPort);
+                        new Socket().connect(httpsSockaddr, 10000);
+                    }
+                    return true;
+                } catch (IOException e) {
+                    if(log.isDebugEnabled()){
+                        log.debug("", e);
+                    }
+                    String msg = e.getMessage();
+                    if (msg.indexOf("Connection refused") == -1 &&
+                        msg.indexOf("connect timed out") == -1) {
+                        log.error("Cannot connect to member " + member, e);
+                    }
+                }
+            }
+            return false;
+        }
+    }
 }