You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/13 22:08:34 UTC

svn commit: r667634 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/ kernel/conf/ kernel/src/org/apache/axis2/clustering/ kernel/src/org/apache/axis2/deployment/

Author: azeez
Date: Fri Jun 13 13:08:34 2008
New Revision: 667634

URL: http://svn.apache.org/viewvc?rev=667634&view=rev
Log:
Added the ability to be able to load balance across multiple application domains


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java?rev=667634&r1=667633&r2=667634&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java Fri Jun 13 13:08:34 2008
@@ -26,9 +26,8 @@
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 /**
@@ -45,26 +44,24 @@
     protected Membership loadBalancerMembership = null;
 
     /**
-     * Represents the application group across which the load is balanced
-     */
-    protected List<Member> applicationMembers = new ArrayList<Member>();
-
-    /**
      * Represents the load balancer group
      */
     protected byte[] loadBalancerDomain = new byte[0];
 
     /**
-     * Represents the group in which the applications being load balanced, are deployed
+     * Represents the group in which the applications being load balanced, are deployed and their
+     * respective load balancer event handlers
      */
-    protected byte[] applicationDomain = new byte[0];
-
-    private LoadBalanceEventHandler eventHandler;
+    private Map<byte[], LoadBalanceEventHandler> lbEventHandlers;
 
     public LoadBalancerInterceptor(byte[] loadBalancerDomain,
-                                   byte[] applicationDomain) {
+                                   Map<byte[], LoadBalanceEventHandler> lbEventHandlers) {
         this.loadBalancerDomain = loadBalancerDomain;
-        this.applicationDomain = applicationDomain;
+        this.lbEventHandlers = lbEventHandlers;
+    }
+
+    public void setLbEventHandlers(Map<byte[], LoadBalanceEventHandler> lbEventHandlers) {
+        this.lbEventHandlers = lbEventHandlers;
     }
 
     public void messageReceived(ChannelMessage msg) {
@@ -92,22 +89,25 @@
         }
 
         // Is this an application domain member?
-        if (Arrays.equals(applicationDomain, member.getDomain())) {
-            log.info("Application member " + TribesUtil.getName(member) + " joined cluster");
-            if (eventHandler != null) {
-                org.apache.axis2.clustering.Member axis2Member =
-                        new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
-                                                               member.getPort());
-                Properties props = getProperties(member.getPayload());
-                int httpPort = Integer.parseInt(props.getProperty("HTTP"));
-                int httpsPort = Integer.parseInt(props.getProperty("HTTPS"));
-                axis2Member.setHttpPort(httpPort);
-                axis2Member.setHttpsPort(httpsPort);
-                eventHandler.applicationMemberAdded(axis2Member);
+        for (byte[] applicationDomain : lbEventHandlers.keySet()) {
+            if (Arrays.equals(applicationDomain, member.getDomain())) {
+                log.info("Application member " + TribesUtil.getName(member) + " joined group " +
+                         new String(applicationDomain));
+                LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
+                if (eventHandler != null) {
+                    org.apache.axis2.clustering.Member axis2Member =
+                            new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
+                                                                   member.getPort());
+                    Properties props = getProperties(member.getPayload());
+                    int httpPort = Integer.parseInt(props.getProperty("HTTP"));
+                    int httpsPort = Integer.parseInt(props.getProperty("HTTPS"));
+                    axis2Member.setHttpPort(httpPort);
+                    axis2Member.setHttpsPort(httpsPort);
+                    eventHandler.applicationMemberAdded(axis2Member);
+                }
+                break;
             }
-            applicationMembers.add(member);
         }
-
     }
 
     private Properties getProperties(byte[] payload) {
@@ -136,14 +136,17 @@
         }
 
         // Is this an application domain member?
-        if (Arrays.equals(applicationDomain, member.getDomain())) {
-            log.info("Application member " + TribesUtil.getName(member) + " left cluster");
-            if (eventHandler != null) {
-                org.apache.axis2.clustering.Member axis2Member =
-                        new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
-                                                               member.getPort());
-                eventHandler.applicationMemberRemoved(axis2Member);
-                applicationMembers.remove(member);
+        for (byte[] applicationDomain : lbEventHandlers.keySet()) {
+            if (Arrays.equals(applicationDomain, member.getDomain())) {
+                log.info("Application member " + TribesUtil.getName(member) + " left group " +
+                         new String(applicationDomain));
+                LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
+                if (eventHandler != null) {
+                    org.apache.axis2.clustering.Member axis2Member =
+                            new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
+                                                                   member.getPort());
+                    eventHandler.applicationMemberRemoved(axis2Member);
+                }
             }
         }
     }
@@ -178,24 +181,4 @@
             loadBalancerMembership = new Membership((MemberImpl) super.getLocalMember(true));
         }
     }
-
-    public byte[] getApplicationDomain() {
-        return applicationDomain;
-    }
-
-    public void setApplicationDomain(byte[] applicationDomain) {
-        this.applicationDomain = applicationDomain;
-    }
-
-    public byte[] getLoadBalancerDomain() {
-        return loadBalancerDomain;
-    }
-
-    public void setLoadBalancerDomain(byte[] loadBalancerDomain) {
-        this.loadBalancerDomain = loadBalancerDomain;
-    }
-
-    public void setEventHandler(LoadBalanceEventHandler eventHandler) {
-        this.eventHandler = eventHandler;
-    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=667634&r1=667633&r2=667634&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri Jun 13 13:08:34 2008
@@ -78,6 +78,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Properties;
+import java.util.Map;
 
 /**
  * The main ClusterManager class for the Tribes based clustering implementation
@@ -99,7 +100,11 @@
     private RpcRequestHandler rpcRequestHandler;
     private StaticMembershipInterceptor staticMembershipInterceptor;
     private List<org.apache.axis2.clustering.Member> members;
-    private LoadBalanceEventHandler lbEventHandler;
+
+    private Map<byte[], LoadBalanceEventHandler> lbEventHandlers =
+            new HashMap<byte[], LoadBalanceEventHandler>();
+    private LoadBalancerInterceptor lbInterceptor;
+    private boolean loadBalanceMode;
 
     public TribesClusterManager() {
         parameters = new HashMap<String, Parameter>();
@@ -113,8 +118,12 @@
         return members;
     }
 
-    public LoadBalanceEventHandler getLoadBalanceEventHandler() {
-        return lbEventHandler;
+    public void addLoadBalanceEventHandler(LoadBalanceEventHandler eventHandler,
+                                           String applicationDomain) {
+        log.info("Load balancing for application domain " + applicationDomain +
+                 " using event handler " + eventHandler.getClass());
+        lbEventHandlers.put(applicationDomain.getBytes(), eventHandler);
+        loadBalanceMode = true;
     }
 
     public ContextManager getContextManager() {
@@ -144,20 +153,11 @@
         String membershipScheme = getMembershipScheme();
         log.info("Using " + membershipScheme + " based membership management scheme");
 
-        String mode = getMode();
-        log.info("Operating in " + mode + " mode");
-
         byte[] domain = getClusterDomain();
         log.info("Cluster domain: " + new String(domain));
 
-        byte[] applicationDomain = new byte[0];
-        if (mode.equals(ClusteringConstants.Mode.LOAD_BALANCE)) {
-            applicationDomain = getApplicationDomain();
-            log.info("Application domain: " + new String(applicationDomain));
-        }
-
         // Add all the ChannelInterceptors
-        addInterceptors(channel, domain, applicationDomain, membershipScheme, mode);
+        addInterceptors(channel, domain, membershipScheme);
 
         // Membership scheme handling
         // If it is a WKA scheme, connect to a WKA and get a list of members. Add the members
@@ -325,31 +325,6 @@
     }
 
     /**
-     * Get the mode in which this member is running
-     *
-     * @return The member mode. Only "application" & "loadBalance" are valid return values.
-     * @throws ClusteringFault If an invalid membershipScheme has been specified in the axis2.xml
-     *                         file
-     */
-    private String getMode() throws ClusteringFault {
-        Parameter modeParam =
-                getParameter(ClusteringConstants.Parameters.MODE);
-        String mode = ClusteringConstants.Mode.APPLICATION;
-        if (modeParam != null) {
-            mode = ((String) modeParam.getValue()).trim();
-        }
-        if (!mode.equals(ClusteringConstants.Mode.APPLICATION) &&
-            !mode.equals(ClusteringConstants.Mode.LOAD_BALANCE)) {
-            String msg = "Invalid mode '" + mode + "'. Supported modes are " +
-                         ClusteringConstants.Mode.APPLICATION + " & " +
-                         ClusteringConstants.Mode.LOAD_BALANCE;
-            log.error(msg);
-            throw new ClusteringFault(msg);
-        }
-        return mode;
-    }
-
-    /**
      * Get the clustering domain to which this node belongs to
      *
      * @return The clustering domain to which this node belongs to
@@ -366,24 +341,6 @@
     }
 
     /**
-     * Get the clustering application domain to which this node belongs to. A valid domain
-     * will be returned only of this member is running in loadBalance mode.
-     *
-     * @return The clustering application domain to which this node belongs to. A valid domain
-     *         will be returned only of this member is running in loadBalance mode.
-     */
-    private byte[] getApplicationDomain() {
-        Parameter domainParam = getParameter(ClusteringConstants.Parameters.APPLICATION_DOMAIN);
-        byte[] domain;
-        if (domainParam != null) {
-            domain = ((String) domainParam.getValue()).getBytes();
-        } else {
-            domain = ClusteringConstants.DEFAULT_APP_DOMAIN.getBytes();
-        }
-        return domain;
-    }
-
-    /**
      * Set the maximum number of retries, if message sending to a particular node fails
      */
     private void setMaximumRetries() {
@@ -640,15 +597,12 @@
      *
      * @param channel           The Tribes channel
      * @param domain            The loadBalancerDomain to which this node belongs to
-     * @param applicationDomain The application domain, across which the load is balanced
      * @param membershipScheme  The membership scheme. Only wka & multicast are valid values.
-     * @param mode              The mode in which this member operates such as "loadBalance" or
-     *                          "application"
      * @throws ClusteringFault If an error occurs while adding interceptors
      */
     private void addInterceptors(ManagedChannel channel,
-                                 byte[] domain, byte[] applicationDomain,
-                                 String membershipScheme, String mode) throws ClusteringFault {
+                                 byte[] domain,
+                                 String membershipScheme) throws ClusteringFault {
 
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
@@ -676,28 +630,13 @@
 
         // Add a DomainFilterInterceptor
         channel.getMembershipService().setDomain(domain);
-        if (mode.equals(ClusteringConstants.Mode.APPLICATION)) {
+        if (!loadBalanceMode) {
             DomainFilterInterceptor dfi = new DomainFilterInterceptor();
             dfi.setDomain(domain);
             channel.addInterceptor(dfi);
-        } else if (mode.equals(ClusteringConstants.Mode.LOAD_BALANCE)) {
-            LoadBalancerInterceptor lbInterceptor =
-                    new LoadBalancerInterceptor(domain, applicationDomain);
-            Parameter lbEvtHandlerParam =
-                    getParameter(ClusteringConstants.Parameters.LOAD_BALANCE_EVENT_HANDLER);
-            if (lbEvtHandlerParam != null && lbEvtHandlerParam.getValue() != null) {
-                String lbEvtHandlerClass = ((String) lbEvtHandlerParam.getValue()).trim();
-                try {
-                    lbEventHandler =
-                            (LoadBalanceEventHandler) Class.forName(lbEvtHandlerClass).newInstance();
-                    lbInterceptor.setEventHandler(lbEventHandler);
-                } catch (Exception e) {
-                    String msg = "Could not instantiate LoadBalanceEventHandler class " +
-                                 lbEvtHandlerClass;
-                    log.error(msg, e);
-                    throw new ClusteringFault(msg, e);
-                }
-            }
+        } else {
+            lbInterceptor =
+                    new LoadBalancerInterceptor(domain, lbEventHandlers);
             channel.addInterceptor(lbInterceptor);
         }
 
@@ -835,11 +774,9 @@
                             }
                         }
                         // TODO: If we do not get a response within some time, try to recover from this fault
-                    } while (responses.length == 0);
-                    if (responses.length > 0) {
-                        ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
-                        break;
-                    }
+                    } while (responses.length == 0 || responses[0] == null || responses[0].getMessage() == null);
+                    ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+                    break;
                 }
             } catch (Exception e) {
                 log.error("Cannot get initialization information from " +

Modified: webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml?rev=667634&r1=667633&r2=667634&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml (original)
+++ webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml Fri Jun 13 13:08:34 2008
@@ -280,25 +280,6 @@
         <parameter name="domain">apache.axis2.domain</parameter>
 
         <!--
-         Indicates the mode in which this member is running. Valid values are "application" and
-         "loadBalance"
-
-         application - This member hosts end user applications
-         loadBalance - This member is a part of the load balancer cluster
-        -->
-        <parameter name="mode">application</parameter>
-
-        <!--
-        This is the even handler which will be notified in the case of load balancing events occurring.
-        This class has to be an implementation of org.apache.axis2.clustering.LoadBalanceEventHandler
-
-        This entry is only valid if the "mode" parameter is set to loadBalance
-        -->
-        <parameter name="loadBalanceEventHandler">
-            org.apache.axis2.clustering.DefaultLoadBalanceEventHandler
-        </parameter>
-
-        <!--
         This parameter is only valid when the "mode" parameter is set to application. This indicates
         the domain in which the the applications being load balanced are deployed.
         -->
@@ -365,6 +346,16 @@
         </members>
 
         <!--
+        Enable the load balancer entry if you need to run this node as a load balancer.
+        Multiple application domains with different LoadBalanceEventHandler implementations
+        can be defined in this section.
+        -->
+        <loadBalancer enable="false">
+            <applicationDomain name="apache.axis2.application.domain"
+                               handler="org.apache.axis2.clustering.DefaultLoadBalanceEventHandler"/>
+        </loadBalancer>
+
+        <!--
            This interface is responsible for handling configuration management.
            Configuraion changes include:
 

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=667634&r1=667633&r2=667634&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri Jun 13 13:08:34 2008
@@ -151,10 +151,13 @@
     List<Member> getMembers();
 
     /**
-     * Get the load balance event handler which will be notified when load balance events occur.
+     * Set the load balance event handler which will be notified when load balance events occur.
      * This will be valid only when this node is running in loadBalance mode
      *
-     * @return The load balance event handler
+     * @param eventHandler      The load balance event handler
+     * @param applicationDomain The application domain which is handled by the eventHandler
      */
-    LoadBalanceEventHandler getLoadBalanceEventHandler();
+    void addLoadBalanceEventHandler(LoadBalanceEventHandler eventHandler, String applicationDomain);
+
+
 }
\ No newline at end of file

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?rev=667634&r1=667633&r2=667634&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Fri Jun 13 13:08:34 2008
@@ -34,11 +34,6 @@
      */
     public static final String DEFAULT_DOMAIN = "apache.axis2.domain";
 
-    /**
-     * The default application domain to which this member belongs to
-     */
-    public static final String DEFAULT_APP_DOMAIN = "apache.axis2.app.domain";
-
     public static final String NODE_MANAGER_SERVICE = "Axis2NodeManager";
     public static final String REQUEST_BLOCKING_HANDLER = "RequestBlockingHandler";
     public static final String CLUSTER_INITIALIZED = "local_cluster.initialized";
@@ -105,22 +100,6 @@
         public static final String AVOID_INITIATION = "AvoidInitiation";
     }
 
-    /**
-     * Represents the mode of this member
-     */
-    public static final class Mode {
-
-        /**
-         * Load balancing mode
-         */
-        public static final String LOAD_BALANCE = "loadBalance";
-
-        /**
-         * Application mode
-         */
-        public static final String APPLICATION = "application";
-    }
-
     public static final class MembershipScheme {
         /**
          * Multicast based membership discovery scheme

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java?rev=667634&r1=667633&r2=667634&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java Fri Jun 13 13:08:34 2008
@@ -25,6 +25,7 @@
 import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.LoadBalanceEventHandler;
 import org.apache.axis2.clustering.configuration.ConfigurationManager;
 import org.apache.axis2.clustering.configuration.ConfigurationManagerListener;
 import org.apache.axis2.clustering.context.ContextManager;
@@ -91,6 +92,9 @@
                               clusterManager,
                               null);
 
+            // loading the application domains
+            loadApplicationDomains(clusterManager, clusterElement);
+
             // loading the members
             loadMembers(clusterManager, clusterElement);
 
@@ -117,6 +121,36 @@
         return enabled;
     }
 
+    private void loadApplicationDomains(ClusterManager clusterManager,
+                                        OMElement clusterElement) throws DeploymentException {
+        OMElement lbEle = clusterElement.getFirstChildWithName(new QName("loadBalancer"));
+        if(lbEle != null){
+            if (isEnabled(lbEle)) {
+                log.info("Running in load balance mode");
+            } else {
+                log.info("Running in application mode");
+                return;
+            }
+
+            for(Iterator iter= lbEle.getChildrenWithName(new QName("applicationDomain"));
+                iter.hasNext();){
+                OMElement omElement = (OMElement) iter.next();
+                String domainName = omElement.getAttributeValue(new QName("name")).trim();
+                String handlerClass = omElement.getAttributeValue(new QName("handler")).trim();
+                LoadBalanceEventHandler eventHandler;
+                try {
+                    eventHandler = (LoadBalanceEventHandler) Class.forName(handlerClass).newInstance();
+                } catch (Exception e) {
+                    String msg = "Could not instantiate LoadBalanceEventHandler " + handlerClass +
+                                 " for domain " + domainName;
+                    log.error(msg, e);
+                    throw new DeploymentException(msg, e);
+                }
+                clusterManager.addLoadBalanceEventHandler(eventHandler, domainName);
+            }
+        }
+    }
+            
     private void loadMembers(ClusterManager clusterManager, OMElement clusterElement) {
         clusterManager.setMembers(new ArrayList<Member>());
         Parameter membershipSchemeParam = clusterManager.getParameter("membershipScheme");