You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/04 16:25:53 UTC

svn commit: r663119 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/ kernel/conf/ kernel/src/org/apache/axis2/clustering/

Author: azeez
Date: Wed Jun  4 07:25:53 2008
New Revision: 663119

URL: http://svn.apache.org/viewvc?rev=663119&view=rev
Log:
Adding the ability for a node to act as a Load Balancer. 
We can deploy the load balancer also in its own load balancer cluster.


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java?rev=663119&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java Wed Jun  4 07:25:53 2008
@@ -0,0 +1,161 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.ChannelMessage;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.ChannelInterceptorBase;
+import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.catalina.tribes.membership.Membership;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * This interceptor is used when this member is part of a load balancer cluster.
+ * This load balancer is responsible for load balancing across applications deployed in
+ * another group.
+ */
+public class LoadBalancerInterceptor extends ChannelInterceptorBase {
+    private static final Log log = LogFactory.getLog(LoadBalancerInterceptor.class);
+
+    /**
+     * Represents the load balancer group
+     */
+    protected Membership loadBalancerMembership = null;
+
+    /**
+     * Represents the application group across which the load is balanced
+     */
+    protected List<Member> applicationMembers = new ArrayList<Member>();
+
+    /**
+     * Represents the load balancer group
+     */
+    protected byte[] loadBalancerDomain = new byte[0];
+
+    /**
+     * Represents the group in which the applications being load balanced, are deployed
+     */
+    protected byte[] applicationDomain = new byte[0];
+
+    public LoadBalancerInterceptor(byte[] loadBalancerDomain,
+                                   byte[] applicationDomain) {
+        this.loadBalancerDomain = loadBalancerDomain;
+        this.applicationDomain = applicationDomain;
+    }
+
+    public void messageReceived(ChannelMessage msg) {
+
+        // Ignore all messages which are not intended for the load balancer group
+        if (Arrays.equals(msg.getAddress().getDomain(), loadBalancerDomain)) {
+            super.messageReceived(msg);
+        }
+    }
+
+    public void memberAdded(Member member) {
+        if (loadBalancerMembership == null) {
+            setupMembership();
+        }
+        boolean notify;
+        synchronized (loadBalancerMembership) {
+            notify = Arrays.equals(loadBalancerDomain, member.getDomain());
+            if (notify) {
+                notify = loadBalancerMembership.memberAlive((MemberImpl) member);
+            }
+        }
+        if (notify) {
+            super.memberAdded(member);
+        }
+
+        // Is this an application domain member?
+        if (Arrays.equals(applicationDomain, member.getDomain())) {
+            log.info("Application member " + TribesUtil.getHost(member) + " joined cluster");
+            applicationMembers.add(member);
+        }
+
+    }
+
+    public void memberDisappeared(Member member) {
+        if (loadBalancerMembership == null) {
+            setupMembership();
+        }
+        boolean notify;
+        synchronized (loadBalancerMembership) {
+            notify = Arrays.equals(loadBalancerDomain, member.getDomain());
+            loadBalancerMembership.removeMember((MemberImpl) member);
+        }
+        if (notify) {
+            super.memberDisappeared(member);
+        }
+
+        // Is this an application domain member?
+        if (Arrays.equals(applicationDomain, member.getDomain())) {
+            log.info("Application member " + TribesUtil.getHost(member) + " left cluster");
+            applicationMembers.remove(member);
+        }
+    }
+
+    public boolean hasMembers() {
+        if (loadBalancerMembership == null) {
+            setupMembership();
+        }
+        return loadBalancerMembership.hasMembers();
+    }
+
+    public Member[] getMembers() {
+        if (loadBalancerMembership == null) {
+            setupMembership();
+        }
+        return loadBalancerMembership.getMembers();
+    }
+
+    public Member getMember(Member mbr) {
+        if (loadBalancerMembership == null) {
+            setupMembership();
+        }
+        return loadBalancerMembership.getMember(mbr);
+    }
+
+    public Member getLocalMember(boolean incAlive) {
+        return super.getLocalMember(incAlive);
+    }
+
+    protected synchronized void setupMembership() {
+        if (loadBalancerMembership == null) {
+            loadBalancerMembership = new Membership((MemberImpl) super.getLocalMember(true));
+        }
+    }
+
+    public byte[] getApplicationDomain() {
+        return applicationDomain;
+    }
+
+    public void setApplicationDomain(byte[] applicationDomain) {
+        this.applicationDomain = applicationDomain;
+    }
+
+    public byte[] getLoadBalancerDomain() {
+        return loadBalancerDomain;
+    }
+
+    public void setLoadBalancerDomain(byte[] loadBalancerDomain) {
+        this.loadBalancerDomain = loadBalancerDomain;
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=663119&r1=663118&r2=663119&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun  4 07:25:53 2008
@@ -133,15 +133,28 @@
                 new ChannelListener(configurationContext, configurationManager, contextManager);
 
         setMaximumRetries();
-        byte[] domain = getClusterDomain();
+
         String membershipScheme = getMembershipScheme();
+        log.info("Using " + membershipScheme + " based membership management scheme");
+
+        String mode = getMode();
+        log.info("Operating in " + mode + " mode");
+
+        byte[] domain = getClusterDomain();
+        log.info("Cluster domain: " + new String(domain));
+
+        byte[] applicationDomain = new byte[0];
+        if (mode.equals(ClusteringConstants.Mode.LOAD_BALANCE)) {
+            applicationDomain = getApplicationDomain();
+            log.info("Application domain: " + new String(applicationDomain));
+        }
 
         // Add all the ChannelInterceptors
-        addInterceptors(channel, domain, membershipScheme);
+        addInterceptors(channel, domain, applicationDomain, membershipScheme, mode);
 
         // Membership scheme handling
         // If it is a WKA scheme, connect to a WKA and get a list of members. Add the members
-        // to the membership manager
+        // to the MembershipManager
         configureMembershipScheme(domain, membershipScheme);
 
         channel.addChannelListener(channelListener);
@@ -254,15 +267,49 @@
      * Get the membership scheme applicable to this cluster
      *
      * @return The membership scheme. Only "wka" & "multicast" are valid return values.
+     * @throws ClusteringFault If the membershipScheme specified in the axis2.xml file is invalid
      */
-    private String getMembershipScheme() {
+    private String getMembershipScheme() throws ClusteringFault {
         Parameter membershipSchemeParam =
                 getParameter(ClusteringConstants.Parameters.MEMBERSHIP_SCHEME);
-        String membershipScheme = ClusteringConstants.MembershipScheme.MULTICAST_BASED;
+        String mbrScheme = ClusteringConstants.MembershipScheme.MULTICAST_BASED;
         if (membershipSchemeParam != null) {
-            membershipScheme = ((String) membershipSchemeParam.getValue()).trim();
+            mbrScheme = ((String) membershipSchemeParam.getValue()).trim();
+        }
+        if (!mbrScheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED) &&
+            !mbrScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+            String msg = "Invalid membership scheme '" + mbrScheme + "'. Supported schemes are " +
+                         ClusteringConstants.MembershipScheme.MULTICAST_BASED + " & " +
+                         ClusteringConstants.MembershipScheme.WKA_BASED;
+            log.error(msg);
+            throw new ClusteringFault(msg);
+        }
+        return mbrScheme;
+    }
+
+    /**
+     * Get the mode in which this member is running
+     *
+     * @return The member mode. Only "application" & "loadBalance" are valid return values.
+     * @throws ClusteringFault If an invalid membershipScheme has been specified in the axis2.xml
+     *                         file
+     */
+    private String getMode() throws ClusteringFault {
+        Parameter modeParam =
+                getParameter(ClusteringConstants.Parameters.MODE);
+        String mode = ClusteringConstants.Mode.APPLICATION;
+        if (modeParam != null) {
+            mode = ((String) modeParam.getValue()).trim();
+        }
+        if (!mode.equals(ClusteringConstants.Mode.APPLICATION) &&
+            !mode.equals(ClusteringConstants.Mode.LOAD_BALANCE)) {
+            String msg = "Invalid mode '" + mode + "'. Supported modes are " +
+                         ClusteringConstants.Mode.APPLICATION + " & " +
+                         ClusteringConstants.Mode.LOAD_BALANCE;
+            log.error(msg);
+            throw new ClusteringFault(msg);
         }
-        return membershipScheme;
+        return mode;
     }
 
     /**
@@ -282,6 +329,24 @@
     }
 
     /**
+     * Get the clustering application domain to which this node belongs to. A valid domain
+     * will be returned only of this member is running in loadBalance mode.
+     *
+     * @return The clustering application domain to which this node belongs to. A valid domain
+     *         will be returned only of this member is running in loadBalance mode.
+     */
+    private byte[] getApplicationDomain() {
+        Parameter domainParam = getParameter(ClusteringConstants.Parameters.APPLICATION_DOMAIN);
+        byte[] domain;
+        if (domainParam != null) {
+            domain = ((String) domainParam.getValue()).getBytes();
+        } else {
+            domain = ClusteringConstants.DEFAULT_APP_DOMAIN.getBytes();
+        }
+        return domain;
+    }
+
+    /**
      * Set the maximum number of retries, if message sending to a particular node fails
      */
     private void setMaximumRetries() {
@@ -350,7 +415,7 @@
     /**
      * Handle specific configurations related to different membership management schemes.
      *
-     * @param domain           The clustering domain to which this member belongs to
+     * @param domain           The clustering loadBalancerDomain to which this member belongs to
      * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
      * @throws ClusteringFault If the membership scheme is invalid, or if an error occurs
      *                         while configuring membership scheme
@@ -359,10 +424,8 @@
             throws ClusteringFault {
 
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
-            log.info("Using WKA based membership management scheme");
             configureWkaBasedMembership(domain);
         } else if (membershipScheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
-            log.info("Using multicast based membership management scheme");
             configureMulticastBasedMembership(channel, domain);
         } else {
             String msg = "Invalid membership scheme '" + membershipScheme +
@@ -375,7 +438,7 @@
     /**
      * Configure the membership related to the WKA based scheme
      *
-     * @param domain The domain to which the members belong to
+     * @param domain The loadBalancerDomain to which the members belong to
      * @throws ClusteringFault If an error occurs while configuring this scheme
      */
     private void configureWkaBasedMembership(byte[] domain) throws ClusteringFault {
@@ -538,13 +601,17 @@
      * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
      * membership management scheme
      *
-     * @param channel          The Tribes channel
-     * @param domain           The domain to which this node belongs to
-     * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
+     * @param channel           The Tribes channel
+     * @param domain            The loadBalancerDomain to which this node belongs to
+     * @param applicationDomain The application domain, across which the load is balanced
+     * @param membershipScheme  The membership scheme. Only wka & multicast are valid values.
+     * @param mode              The mode in which this member operates such as "loadBalance" or
+     *                          "application"
      * @throws ClusteringFault If an error occurs while adding interceptors
      */
-    private void addInterceptors(ManagedChannel channel, byte[] domain, String membershipScheme)
-            throws ClusteringFault {
+    private void addInterceptors(ManagedChannel channel,
+                                 byte[] domain, byte[] applicationDomain,
+                                 String membershipScheme, String mode) throws ClusteringFault {
 
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
@@ -552,12 +619,6 @@
             channel.addInterceptor(tcpPingInterceptor);
         }
 
-        // Add a DomainFilterInterceptor
-        channel.getMembershipService().setDomain(domain);
-        DomainFilterInterceptor dfi = new DomainFilterInterceptor();
-        dfi.setDomain(domain);
-        channel.addInterceptor(dfi);
-
         // Add the NonBlockingCoordinator. This is used for leader election
         /*nbc = new NonBlockingCoordinator() {
             public void fireInterceptorEvent(InterceptorEvent event) {
@@ -569,6 +630,25 @@
         nbc.setPrevious(dfi);
         channel.addInterceptor(nbc);*/
 
+        // Add a reliable failure detector
+        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+//        tcpFailureDetector.setPrevious(dfi); //TODO: check this
+//        tcpFailureDetector.setReadTestTimeout(30000);
+        tcpFailureDetector.setConnectTimeout(30000);
+        channel.addInterceptor(tcpFailureDetector);
+
+        // Add a DomainFilterInterceptor
+        if (mode.equals(ClusteringConstants.Mode.APPLICATION)) {
+            channel.getMembershipService().setDomain(domain);
+            DomainFilterInterceptor dfi = new DomainFilterInterceptor();
+            dfi.setDomain(domain);
+            channel.addInterceptor(dfi);
+        } else if (mode.equals(ClusteringConstants.Mode.LOAD_BALANCE)) {
+            LoadBalancerInterceptor lbInterceptor =
+                    new LoadBalancerInterceptor(domain, applicationDomain);
+            channel.addInterceptor(lbInterceptor);
+        }
+
         // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
         AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
         channel.addInterceptor(atMostOnceInterceptor);
@@ -578,13 +658,6 @@
         orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
         channel.addInterceptor(orderInterceptor);
 
-        // Add a reliable failure detector
-        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-        tcpFailureDetector.setPrevious(dfi);
-        tcpFailureDetector.setReadTestTimeout(30000);
-        tcpFailureDetector.setConnectTimeout(30000);
-        channel.addInterceptor(tcpFailureDetector);
-
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             staticMembershipInterceptor = new StaticMembershipInterceptor();
             channel.addInterceptor(staticMembershipInterceptor);
@@ -596,7 +669,7 @@
      * parameters
      *
      * @param channel The Tribes channel
-     * @param domain  The clustering domain to which this node belongs to
+     * @param domain  The clustering loadBalancerDomain to which this node belongs to
      * @throws ClusteringFault If an error occurs while obtaining the local host address
      */
     private void configureMulticastBasedMembership(ManagedChannel channel,
@@ -716,7 +789,7 @@
                         break;
                     }
                 }
-            } catch (ChannelException e) {
+            } catch (Exception e) {
                 log.error("Cannot get initialization information from " +
                           memberHost + ". Will retry in 2 secs.", e);
                 sentMembersList.add(memberHost);

Modified: webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml?rev=663119&r1=663118&r2=663119&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml (original)
+++ webservices/axis2/trunk/java/modules/kernel/conf/axis2.xml Wed Jun  4 07:25:53 2008
@@ -280,6 +280,21 @@
         <parameter name="domain">apache.axis2.domain</parameter>
 
         <!--
+         Indicates the mode in which this member is running. Valid values are "application" and
+         "loadBalance"
+
+         application - This member hosts end user applications
+         loadBalance - This member is a part of the load balancer cluster
+        -->
+        <parameter name="mode">application</parameter>
+
+        <!--
+        This parameter is only valid when the "mode" parameter is set to application. This indicates
+        the domain in which the the applications being load balanced are deployed.
+        -->
+        <parameter name="applicationDomain">apache.axis2.application.domain</parameter>
+
+        <!--
            When a Web service request is received, and processed, before the response is sent to the
            client, should we update the states of all members in the cluster? If the value of
            this parameter is set to "true", the response to the client will be sent only after

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java?rev=663119&r1=663118&r2=663119&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusteringConstants.java Wed Jun  4 07:25:53 2008
@@ -29,10 +29,16 @@
     }
 
     /**
-     * The default domain to which this node belongs to
+     * The default domain to which this member belongs to. This node may be running in application
+     * or loadBalance mode
      */
     public static final String DEFAULT_DOMAIN = "apache.axis2.domain";
 
+    /**
+     * The default application domain to which this member belongs to
+     */
+    public static final String DEFAULT_APP_DOMAIN = "apache.axis2.app.domain";
+
     public static final String NODE_MANAGER_SERVICE = "Axis2NodeManager";
     public static final String REQUEST_BLOCKING_HANDLER = "RequestBlockingHandler";
     public static final String CLUSTER_INITIALIZED = "local_cluster.initialized";
@@ -59,6 +65,22 @@
         public static final String DOMAIN = "domain";
 
         /**
+         * Indicates the mode in which this member is running. Valid values are "application" and
+         * "loadBalance"
+         *
+         * application - This member hosts end user applications
+         * loadBalance - This member is a part of the load balancer cluster
+         */
+        public static final String MODE = "mode";
+
+        /**
+         * This parameter is only valid when the "mode" parameter is set to "application"
+         *
+         * This indicates the domain in which the the applications being load balanced are deployed.
+         */
+        public static final String APPLICATION_DOMAIN = "applicationDomain";
+
+        /**
          * When a Web service request is received, and processed, before the response is sent to the
          * client, should we update the states of all members in the cluster? If the value of
          * this parameter is set to "true", the response to the client will be sent only after
@@ -75,6 +97,22 @@
         public static final String AVOID_INITIATION = "AvoidInitiation";
     }
 
+    /**
+     * Represents the mode of this member
+     */
+    public static final class Mode{
+
+        /**
+         * Load balancing mode
+         */
+        public static final String LOAD_BALANCE = "loadBalance";
+
+        /**
+         * Application mode
+         */
+        public static final String APPLICATION = "application";
+    }
+
     public static final class MembershipScheme {
         /**
          * Multicast based membership discovery scheme