You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/25 18:42:20 UTC

svn commit: r671600 [1/2] - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: ./ control/wka/ tribes/

Author: azeez
Date: Wed Jun 25 09:42:19 2008
New Revision: 671600

URL: http://svn.apache.org/viewvc?rev=671600&view=rev
Log:
Added load balancing with WKA based membership discovery. In the process of adding this new functionality, I had to carry out major refactoring of the code.


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/DefaultLoadBalanceEventHandler.java Wed Jun 25 09:42:19 2008
@@ -82,7 +82,7 @@
          */
         private boolean canConnect(Member member) {
             if(log.isDebugEnabled()){
-                log.debug("Trying to connect to member " + member.getHostName() + "...");
+                log.debug("Trying to connect to member " + member + "...");
             }
             for (int retries = 30; retries > 0; retries--) {
                 try {

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/MembershipScheme.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,37 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering;
+
+/**
+ * A representation of a membership scheme such as "multicast based" or "well-known address (WKA)
+ * based" schemes. This is directly related to the membership discovery mechanism.
+ */
+public interface MembershipScheme {
+
+    /**
+     * Initialize this membership scheme
+     *
+     * @throws ClusteringFault If an error occurs while initializing
+     */
+    void init() throws ClusteringFault;
+
+    /**
+     * JOIN the group
+     *
+     * @throws ClusteringFault If an error occurs while joining the group
+     */
+    void joinGroup() throws ClusteringFault;
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Wed Jun 25 09:42:19 2008
@@ -20,7 +20,6 @@
 import org.apache.axis2.clustering.tribes.MembershipManager;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 
 import java.util.Arrays;
 
@@ -33,27 +32,20 @@
 
     private Member member;
     private MembershipManager membershipManager;
-    private StaticMembershipInterceptor staticMembershipInterceptor;
 
     public void setMembershipManager(MembershipManager membershipManager) {
         this.membershipManager = membershipManager;
     }
 
-    public void setStaticMembershipInterceptor(
-            StaticMembershipInterceptor staticMembershipInterceptor) {
-        this.staticMembershipInterceptor = staticMembershipInterceptor;
-    }
-
     public void setMember(Member member) {
         this.member = member;
     }
 
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
         Member localMember = membershipManager.getLocalMember();
-        if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
+        if (localMember == null || !(Arrays.equals(localMember.getHost(), member.getHost()) &&
               localMember.getPort() == member.getPort())) {
             membershipManager.memberAdded(member);
-            staticMembershipInterceptor.memberAdded(member);
         }
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Wed Jun 25 09:42:19 2008
@@ -21,7 +21,6 @@
 import org.apache.axis2.clustering.tribes.TribesUtil;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -37,33 +36,22 @@
     private static final Log log = LogFactory.getLog(MemberListCommand.class);
 
     private Member[] members;
-    private Member sender;
     private MembershipManager membershipManager;
-    private StaticMembershipInterceptor staticMembershipInterceptor;
 
     public void setMembershipManager(MembershipManager membershipManager) {
         this.membershipManager = membershipManager;
     }
 
-    public void setStaticMembershipInterceptor(StaticMembershipInterceptor staticMembershipInterceptor) {
-        this.staticMembershipInterceptor = staticMembershipInterceptor;
-    }
-
     public void setMembers(Member[] members) {
         this.members = members;
     }
 
-    public void setSender(Member sender) {
-        this.sender = sender;
-    }
-
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
         log.info("Received MEMBER_LIST message");
         Member localMember = membershipManager.getLocalMember();
         for (Member member : members) {
             addMember(localMember, member);
         }
-        addMember(localMember, sender);
     }
 
     private void addMember(Member localMember, Member member) {
@@ -72,7 +60,6 @@
               localMember.getPort() == member.getPort())) {
             log.info("Added member " + TribesUtil.getName(member));
             membershipManager.memberAdded(member);
-            staticMembershipInterceptor.memberAdded(member);
         }
     }
 }

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ApplicationMode.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,56 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ *  Represents a member running in application mode
+ */
+public class ApplicationMode implements Mode {
+
+     private static final Log log = LogFactory.getLog(LoadBalancerMode.class);
+
+    private byte[] loadBalancerDomain;
+
+    public ApplicationMode(byte[] loadBalancerDomain) {
+        this.loadBalancerDomain = loadBalancerDomain;
+    }
+
+    public void addInterceptors(Channel channel) {
+        DomainFilterInterceptor dfi = new DomainFilterInterceptor();
+        dfi.setOptionFlag(TribesConstants.MEMBERSHIP_MSG_OPTION);
+        dfi.setDomain(loadBalancerDomain);
+        channel.addInterceptor(dfi);
+        if (log.isDebugEnabled()) {
+            log.debug("Added Domain Filter Interceptor");
+        }
+    }
+
+    public void init(Channel channel) {
+        // Nothing to be done
+    }
+
+    public List<MembershipManager> getMembershipManagers() {
+        return new ArrayList<MembershipManager>();
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerInterceptor.java Wed Jun 25 09:42:19 2008
@@ -15,20 +15,11 @@
  */
 package org.apache.axis2.clustering.tribes;
 
-import org.apache.axis2.clustering.LoadBalanceEventHandler;
 import org.apache.catalina.tribes.ChannelMessage;
-import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.ChannelInterceptorBase;
-import org.apache.catalina.tribes.membership.MemberImpl;
 import org.apache.catalina.tribes.membership.Membership;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.util.Arrays;
-import java.util.Map;
-import java.util.Properties;
 
 /**
  * This interceptor is used when this member is part of a load balancer cluster.
@@ -36,7 +27,6 @@
  * another group.
  */
 public class LoadBalancerInterceptor extends ChannelInterceptorBase {
-    private static final Log log = LogFactory.getLog(LoadBalancerInterceptor.class);
 
     /**
      * Represents the load balancer group
@@ -48,146 +38,15 @@
      */
     protected byte[] loadBalancerDomain = new byte[0];
 
-    /**
-     * Represents the group in which the applications being load balanced, are deployed and their
-     * respective load balancer event handlers
-     */
-    private Map<String, LoadBalanceEventHandler> lbEventHandlers;
-
-    public LoadBalancerInterceptor(byte[] loadBalancerDomain,
-                                   Map<String, LoadBalanceEventHandler> lbEventHandlers) {
+    public LoadBalancerInterceptor(byte[] loadBalancerDomain) {
         this.loadBalancerDomain = loadBalancerDomain;
-        this.lbEventHandlers = lbEventHandlers;
-    }
-
-    public void setLbEventHandlers(Map<String, LoadBalanceEventHandler> lbEventHandlers) {
-        this.lbEventHandlers = lbEventHandlers;
     }
 
     public void messageReceived(ChannelMessage msg) {
-
         // Ignore all messages which are not intended for the load balancer group
-        if (Arrays.equals(msg.getAddress().getDomain(), loadBalancerDomain)) {
+        if (okToProcess(msg.getOptions()) ||
+            Arrays.equals(msg.getAddress().getDomain(), loadBalancerDomain)) {
             super.messageReceived(msg);
         }
     }
-
-    public void memberAdded(Member member) {
-        if (loadBalancerMembership == null) {
-            setupMembership();
-        }
-        boolean notify;
-        synchronized (loadBalancerMembership) {
-            notify = Arrays.equals(loadBalancerDomain, member.getDomain());
-            if (notify) {
-                notify = loadBalancerMembership.memberAlive((MemberImpl) member);
-            }
-        }
-        if (notify) {
-            super.memberAdded(member);
-
-        }
-
-        // Is this an application domain member?
-        for (String applicationDomain : lbEventHandlers.keySet()) {
-            if (Arrays.equals(applicationDomain.getBytes(), member.getDomain())) {
-                log.info("Application member " + TribesUtil.getName(member) + " joined group " +
-                         applicationDomain);
-                LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
-                if (eventHandler != null) {
-                    eventHandler.applicationMemberAdded(toAxis2Member(member));
-                }
-                break;
-            }
-        }
-    }
-
-    private org.apache.axis2.clustering.Member toAxis2Member(Member member) {
-        org.apache.axis2.clustering.Member axis2Member =
-                new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
-                                                       member.getPort());
-        Properties props = getProperties(member.getPayload());
-
-        String http = props.getProperty("HTTP");
-        if (http != null && http.trim().length() != 0) {
-            axis2Member.setHttpPort(Integer.parseInt(http));
-        }
-
-        String https = props.getProperty("HTTPS");
-        if (https != null && https.trim().length() != 0) {
-            axis2Member.setHttpsPort(Integer.parseInt(https));
-        }
-        
-        return axis2Member;
-    }
-
-    private Properties getProperties(byte[] payload) {
-        Properties props = null;
-        try {
-            ByteArrayInputStream bin = new ByteArrayInputStream(payload);
-            props = new Properties();
-            props.load(bin);
-        } catch (IOException ignored) {
-            // This error will never occur
-        }
-        return props;
-    }
-
-    public void memberDisappeared(Member member) {
-        if (loadBalancerMembership == null) {
-            setupMembership();
-        }
-        boolean notify;
-        synchronized (loadBalancerMembership) {
-            notify = Arrays.equals(loadBalancerDomain, member.getDomain());
-            loadBalancerMembership.removeMember((MemberImpl) member);
-        }
-        if (notify) {
-            super.memberDisappeared(member);
-        }
-
-        // Is this an application domain member?
-        for (String applicationDomain : lbEventHandlers.keySet()) {
-            if (Arrays.equals(applicationDomain.getBytes(), member.getDomain())) {
-                log.info("Application member " + TribesUtil.getName(member) + " left group " +
-                         applicationDomain);
-                LoadBalanceEventHandler eventHandler = lbEventHandlers.get(applicationDomain);
-                if (eventHandler != null) {
-                    eventHandler.applicationMemberRemoved(toAxis2Member(member));
-                }
-                break;
-            }
-        }
-    }
-
-    public boolean hasMembers() {
-        if (loadBalancerMembership == null) {
-            setupMembership();
-        }
-        return loadBalancerMembership.hasMembers();
-    }
-
-    public Member[] getMembers() {
-        if (loadBalancerMembership == null) {
-            setupMembership();
-        }
-        return loadBalancerMembership.getMembers();
-    }
-
-    public Member getMember(Member mbr) {
-        if (loadBalancerMembership == null) {
-            setupMembership();
-        }
-        return loadBalancerMembership.getMember(mbr);
-    }
-
-    public Member getLocalMember(boolean incAlive) {
-        return super.getLocalMember(incAlive);
-    }
-
-    protected synchronized void setupMembership() {
-        if (loadBalancerMembership == null) {
-            loadBalancerMembership = new Membership((MemberImpl) super.getLocalMember(true));
-        }
-    }
 }

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/LoadBalancerMode.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,81 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.LoadBalanceEventHandler;
+import org.apache.catalina.tribes.Channel;
+import org.apache.catalina.tribes.MembershipListener;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents a member running in load balance mode
+ */
+public class LoadBalancerMode implements Mode {
+
+    private static final Log log = LogFactory.getLog(LoadBalancerMode.class);
+
+    private byte[] loadBalancerDomain;
+    private Map<String, LoadBalanceEventHandler> lbEventHandlers;
+    private List<MembershipManager> membershipManagers = new ArrayList<MembershipManager>();
+
+    public LoadBalancerMode(byte[] loadBalancerDomain,
+                            Map<String, LoadBalanceEventHandler> lbEventHandlers) {
+        this.loadBalancerDomain = loadBalancerDomain;
+        this.lbEventHandlers = lbEventHandlers;
+    }
+
+    public void addInterceptors(Channel channel) {
+        LoadBalancerInterceptor lbInterceptor =
+                new LoadBalancerInterceptor(loadBalancerDomain);
+        lbInterceptor.setOptionFlag(TribesConstants.MEMBERSHIP_MSG_OPTION);
+        channel.addInterceptor(lbInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added Load Balancer Interceptor");
+        }
+    }
+
+    public void init(Channel channel) {
+        // Have multiple RPC channels with multiple RPC request handlers for each domain
+        // This is needed only when this member is running as a load balancer
+        for (Object o : lbEventHandlers.keySet()) {
+            String domain = (String) o;
+            final MembershipManager membershipManager = new MembershipManager();
+            membershipManager.setDomain(domain.getBytes());
+            membershipManager.setLoadBalanceEventHandler(lbEventHandlers.get(domain));
+
+            MembershipListener membershipListener = new MembershipListener() {
+                public void memberAdded(org.apache.catalina.tribes.Member member) {
+                    membershipManager.memberAdded(member);
+                }
+
+                public void memberDisappeared(org.apache.catalina.tribes.Member member) {
+                    membershipManager.memberDisappeared(member);
+                }
+            };
+            channel.addMembershipListener(membershipListener);
+            membershipManagers.add(membershipManager);
+        }
+    }
+
+    public List<MembershipManager> getMembershipManagers() {
+        return membershipManagers;
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Wed Jun 25 09:42:19 2008
@@ -19,17 +19,25 @@
 
 package org.apache.axis2.clustering.tribes;
 
+import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.LoadBalanceEventHandler;
 import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.catalina.tribes.membership.MemberImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Properties;
 import java.util.Random;
 
 /**
@@ -40,6 +48,18 @@
     private static final Log log = LogFactory.getLog(MembershipManager.class);
 
     private RpcChannel rpcChannel;
+    private StaticMembershipInterceptor staticMembershipInterceptor;
+
+    /**
+     * The domain corresponding to the membership handled by this MembershipManager
+     */
+    private byte[] domain;
+    private LoadBalanceEventHandler loadBalanceEventHandler;
+    private ConfigurationContext configContext;
+
+    public MembershipManager(ConfigurationContext configContext) {
+        this.configContext = configContext;
+    }
 
     public MembershipManager() {
     }
@@ -48,6 +68,23 @@
         this.rpcChannel = rpcChannel;
     }
 
+    public void setStaticMembershipInterceptor(
+            StaticMembershipInterceptor staticMembershipInterceptor) {
+        this.staticMembershipInterceptor = staticMembershipInterceptor;
+    }
+
+    public void setLoadBalanceEventHandler(LoadBalanceEventHandler loadBalanceEventHandler) {
+        this.loadBalanceEventHandler = loadBalanceEventHandler;
+    }
+
+    public void setDomain(byte[] domain) {
+        this.domain = domain;
+    }
+
+    public byte[] getDomain() {
+        return domain;
+    }
+
     /**
      * List of current members in the cluster. Only the members who are alive will be in this
      * list
@@ -84,19 +121,47 @@
      * A new member is added
      *
      * @param member The new member that joined the cluster
-     * @return true - if the member was added to the <code>members</code> array; false, otherwise.
+     * @return true  If the member was added to the <code>members</code> array; false, otherwise.
      */
     public synchronized boolean memberAdded(Member member) {
-        if (!members.contains(member)) {
-            if (rpcChannel != null && wkaMembers.contains(member)) { // if it is a well-known member
+
+        // If this member already exists or if the member belongs to another domain,
+        // there is no need to add it
+        if(members.contains(member) || !Arrays.equals(domain, member.getDomain())){
+            return false;
+        }
+
+        if (staticMembershipInterceptor != null) { // this interceptor is null when multicast based scheme is used
+            staticMembershipInterceptor.addStaticMember(member);
+            if (log.isDebugEnabled()) {
+                log.debug("Added static member " + TribesUtil.getName(member) +
+                          " from domain " + new String(member.getDomain()));
+            }
+        }
+
+        boolean shouldAddMember = localMember == null ||
+                                  Arrays.equals(localMember.getDomain(), member.getDomain());
+
+        // If this member is a load balancer, notify the respective load balance event handler?
+        if (loadBalanceEventHandler != null) {
+            log.info("Application member " + TribesUtil.getName(member) + " joined group " +
+                     new String(member.getDomain()));
+            loadBalanceEventHandler.applicationMemberAdded(toAxis2Member(member));
+        }
+
+        if (shouldAddMember) {
+            if (rpcChannel != null && isLocalMemberInitialized() &&
+                wkaMembers.contains(member)) { // if it is a well-known member
 
                 log.info("A WKA member " + TribesUtil.getName(member) +
                          " just joined the group. Sending MEMBER_LIST message.");
-                // send the memeber list to it
+                // send the member list to it
                 MemberListCommand memListCmd;
                 try {
                     memListCmd = new MemberListCommand();
-                    memListCmd.setMembers(getMembers());
+                    List<Member> members = new ArrayList<Member>(this.members);
+                    members.add(localMember); // Need to set the local member too
+                    memListCmd.setMembers(members.toArray(new Member[members.size()]));
                     rpcChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
                                     Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
                 } catch (Exception e) {
@@ -107,11 +172,55 @@
                 }
             }
             members.add(member);
+            if (log.isDebugEnabled()) {
+                log.debug("Added group member " + TribesUtil.getName(member) + " to domain " +
+                          new String(member.getDomain()));
+            }
             return true;
         }
         return false;
     }
 
+    private org.apache.axis2.clustering.Member toAxis2Member(Member member) {
+        org.apache.axis2.clustering.Member axis2Member =
+                new org.apache.axis2.clustering.Member(TribesUtil.getHost(member),
+                                                       member.getPort());
+        Properties props = getProperties(member.getPayload());
+
+        String http = props.getProperty("HTTP");
+        if (http != null && http.trim().length() != 0) {
+            axis2Member.setHttpPort(Integer.parseInt(http));
+        }
+
+        String https = props.getProperty("HTTPS");
+        if (https != null && https.trim().length() != 0) {
+            axis2Member.setHttpsPort(Integer.parseInt(https));
+        }
+
+        return axis2Member;
+    }
+
+    private Properties getProperties(byte[] payload) {
+        Properties props = null;
+        try {
+            ByteArrayInputStream bin = new ByteArrayInputStream(payload);
+            props = new Properties();
+            props.load(bin);
+        } catch (IOException ignored) {
+            // This error will never occur
+        }
+        return props;
+    }
+
+    private boolean isLocalMemberInitialized() {
+        if (configContext == null) {
+            return false;
+        }
+        Object clusterInitialized =
+                configContext.getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED);
+        return clusterInitialized != null && clusterInitialized.equals("true");
+    }
+
     /**
      * A member disappeared
      *
@@ -119,6 +228,11 @@
      */
     public synchronized void memberDisappeared(Member member) {
         members.remove(member);
+
+        // Is this an application domain member?
+        if (loadBalanceEventHandler != null) {
+            loadBalanceEventHandler.applicationMemberRemoved(toAxis2Member(member));
+        }
     }
 
     /**

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/Mode.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,42 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.Channel;
+
+import java.util.List;
+
+/**
+ * The mode in which this member is running such a loadBalance or application
+ */
+public interface Mode {
+
+    /**
+     * Add channel interecptors
+     *
+     * @param channel The Channel to which interceptors need to be added
+     */
+    public void addInterceptors(Channel channel);
+
+    /**
+     * Initialize this mode
+     *
+     * @param channel The channel related to this member
+     */
+    void init(Channel channel);
+
+    List<MembershipManager> getMembershipManagers();
+}

Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java?rev=671600&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MulticastBasedMembershipScheme.java Wed Jun 25 09:42:19 2008
@@ -0,0 +1,183 @@
+/*                                                                             
+ * Copyright 2004,2005 The Apache Software Foundation.                         
+ *                                                                             
+ * Licensed under the Apache License, Version 2.0 (the "License");             
+ * you may not use this file except in compliance with the License.            
+ * You may obtain a copy of the License at                                     
+ *                                                                             
+ *      http://www.apache.org/licenses/LICENSE-2.0                             
+ *                                                                             
+ * Unless required by applicable law or agreed to in writing, software         
+ * distributed under the License is distributed on an "AS IS" BASIS,           
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.    
+ * See the License for the specific language governing permissions and         
+ * limitations under the License.                                              
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.axis2.clustering.ClusteringConstants;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.MembershipScheme;
+import org.apache.axis2.description.Parameter;
+import org.apache.axis2.util.Utils;
+import org.apache.catalina.tribes.ManagedChannel;
+import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
+import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
+import org.apache.catalina.tribes.transport.ReceiverBase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.net.SocketException;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of the multicast based membership scheme. In this scheme, membership is discovered
+ * using multicasts
+ */
+public class MulticastBasedMembershipScheme implements MembershipScheme {
+
+    private static final Log log = LogFactory.getLog(MulticastBasedMembershipScheme.class);
+
+    /**
+     * The Tribes channel
+     */
+    private ManagedChannel channel;
+    private Map<String, Parameter> parameters;
+
+    /**
+     * The domain to which this node belongs to
+     */
+    private byte[] domain;
+
+    /**
+     * The mode in which this member operates such as "loadBalance" or "application"
+     */
+    private Mode mode;
+
+    public MulticastBasedMembershipScheme(ManagedChannel channel,
+                                          Mode mode,
+                                          Map<String, Parameter> parameters,
+                                          byte[] domain) {
+        this.channel = channel;
+        this.mode = mode;
+        this.parameters = parameters;
+        this.domain = domain;
+    }
+
+    public void init() throws ClusteringFault {
+        addInterceptors();
+        configureMulticastParameters();
+    }
+
+    public void joinGroup() throws ClusteringFault {
+        // Nothing to do
+    }
+
+    private void configureMulticastParameters() throws ClusteringFault {
+        Properties mcastProps = channel.getMembershipService().getProperties();
+        Parameter mcastAddress = getParameter(TribesConstants.MCAST_ADDRESS);
+        if (mcastAddress != null) {
+            mcastProps.setProperty(TribesConstants.MCAST_ADDRESS,
+                                   ((String) mcastAddress.getValue()).trim());
+        }
+        Parameter mcastBindAddress = getParameter(TribesConstants.MCAST_BIND_ADDRESS);
+        if (mcastBindAddress != null) {
+            mcastProps.setProperty(TribesConstants.MCAST_BIND_ADDRESS,
+                                   ((String) mcastBindAddress.getValue()).trim());
+        }
+
+        Parameter mcastPort = getParameter(TribesConstants.MCAST_PORT);
+        if (mcastPort != null) {
+            mcastProps.setProperty(TribesConstants.MCAST_PORT,
+                                   ((String) mcastPort.getValue()).trim());
+        }
+        Parameter mcastFrequency = getParameter(TribesConstants.MCAST_FREQUENCY);
+        if (mcastFrequency != null) {
+            mcastProps.setProperty(TribesConstants.MCAST_FREQUENCY,
+                                   ((String) mcastFrequency.getValue()).trim());
+        }
+        Parameter mcastMemberDropTime = getParameter(TribesConstants.MEMBER_DROP_TIME);
+        if (mcastMemberDropTime != null) {
+            mcastProps.setProperty(TribesConstants.MEMBER_DROP_TIME,
+                                   ((String) mcastMemberDropTime.getValue()).trim());
+        }
+
+        // Set the IP address that will be advertised by this node
+        ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
+        Parameter tcpListenHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
+        if (tcpListenHost != null) {
+            String host = ((String) tcpListenHost.getValue()).trim();
+            mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
+            mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
+            receiver.setAddress(host);
+        } else {
+            String host;
+            try {
+                host = Utils.getIpAddress();
+            } catch (SocketException e) {
+                String msg = "Could not get local IP address";
+                log.error(msg, e);
+                throw new ClusteringFault(msg, e);
+            }
+            mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
+            mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
+            receiver.setAddress(host);
+        }
+        String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
+        if (localIP != null) {
+            receiver.setAddress(localIP);
+        }
+
+        Parameter tcpListenPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
+        if (tcpListenPort != null) {
+            String port = ((String) tcpListenPort.getValue()).trim();
+            mcastProps.setProperty(TribesConstants.TCP_LISTEN_PORT, port);
+            receiver.setPort(Integer.parseInt(port));
+        }
+
+        mcastProps.setProperty(TribesConstants.MCAST_CLUSTER_DOMAIN, new String(domain));
+    }
+
+    /**
+     * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
+     * membership management scheme
+     */
+    private void addInterceptors() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Adding Interceptors...");
+        }
+
+        // Add a reliable failure detector
+        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
+        tcpFailureDetector.setConnectTimeout(30000);
+        channel.addInterceptor(tcpFailureDetector);
+        if (log.isDebugEnabled()) {
+            log.debug("Added TCP Failure Detector");
+        }
+
+        channel.getMembershipService().setDomain(domain);
+        mode.addInterceptors(channel);
+
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+        atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
+        channel.addInterceptor(atMostOnceInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added At-most-once Interceptor");
+        }
+
+        // Add the OrderInterceptor to preserve sender ordering
+        OrderInterceptor orderInterceptor = new OrderInterceptor();
+        orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
+        channel.addInterceptor(orderInterceptor);
+        if (log.isDebugEnabled()) {
+            log.debug("Added Message Order Interceptor");
+        }
+    }
+
+    public Parameter getParameter(String name) {
+        return parameters.get(name);
+    }
+}

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Wed Jun 25 09:42:19 2008
@@ -32,7 +32,6 @@
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -46,14 +45,11 @@
     private static Log log = LogFactory.getLog(RpcRequestHandler.class);
     private ConfigurationContext configurationContext;
     private MembershipManager membershipManager;
-    private StaticMembershipInterceptor staticMembershipInterceptor;
 
     public RpcRequestHandler(ConfigurationContext configurationContext,
-                             MembershipManager membershipManager,
-                             StaticMembershipInterceptor staticMembershipInterceptor) {
+                             MembershipManager membershipManager) {
         this.configurationContext = configurationContext;
         this.membershipManager = membershipManager;
-        this.staticMembershipInterceptor = staticMembershipInterceptor;
     }
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
@@ -104,27 +100,17 @@
             }
         } else if (msg instanceof JoinGroupCommand) {
             log.info("Received JOIN message from " + TribesUtil.getName(invoker));
-            MemberListCommand memListCmd;
-            try {
-                // Add the member
-                staticMembershipInterceptor.memberAdded(invoker);
-                membershipManager.memberAdded(invoker);
-
-                // Return the list of current members to the caller
-                memListCmd = new MemberListCommand();
-                memListCmd.setMembers(membershipManager.getMembers());
-            } catch (Exception e) {
-                String errMsg = "Cannot handle JOIN request";
-                log.error(errMsg, e);
-                throw new RemoteProcessException(errMsg, e);
-            }
+            membershipManager.memberAdded(invoker);
+
+            // Return the list of current members to the caller
+            MemberListCommand memListCmd = new MemberListCommand();
+            memListCmd.setMembers(membershipManager.getMembers());
             return memListCmd;
         } else if (msg instanceof MemberJoinedCommand) {
             log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(invoker));
             try {
                 MemberJoinedCommand command = (MemberJoinedCommand) msg;
                 command.setMembershipManager(membershipManager);
-                command.setStaticMembershipInterceptor(staticMembershipInterceptor);
                 command.execute(configurationContext);
             } catch (ClusteringFault e) {
                 String errMsg = "Cannot handle MEMBER_JOINED notification";
@@ -135,8 +121,6 @@
             try {                    //TODO: What if we receive more than one member list message?
                 MemberListCommand command = (MemberListCommand) msg;
                 command.setMembershipManager(membershipManager);
-                command.setStaticMembershipInterceptor(staticMembershipInterceptor);
-                command.setSender(invoker);
                 command.execute(configurationContext);
 
                 //TODO Send MEMBER_JOINED messages to all nodes

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed Jun 25 09:42:19 2008
@@ -21,11 +21,11 @@
 
 import org.apache.axiom.om.OMElement;
 import org.apache.axis2.AxisFault;
-import org.apache.axis2.util.Utils;
 import org.apache.axis2.clustering.ClusterManager;
 import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.LoadBalanceEventHandler;
+import org.apache.axis2.clustering.MembershipScheme;
 import org.apache.axis2.clustering.RequestBlockingHandler;
 import org.apache.axis2.clustering.configuration.ConfigurationManager;
 import org.apache.axis2.clustering.configuration.DefaultConfigurationManager;
@@ -35,9 +35,6 @@
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
-import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
-import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
-import org.apache.axis2.clustering.control.wka.MemberListCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.HandlerDescription;
 import org.apache.axis2.description.Parameter;
@@ -53,28 +50,14 @@
 import org.apache.catalina.tribes.group.GroupChannel;
 import org.apache.catalina.tribes.group.Response;
 import org.apache.catalina.tribes.group.RpcChannel;
-import org.apache.catalina.tribes.group.interceptors.DomainFilterInterceptor;
-import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
-import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
-import org.apache.catalina.tribes.group.interceptors.TcpFailureDetector;
-import org.apache.catalina.tribes.group.interceptors.TcpPingInterceptor;
-import org.apache.catalina.tribes.membership.StaticMember;
 import org.apache.catalina.tribes.transport.MultiPointSender;
-import org.apache.catalina.tribes.transport.ReceiverBase;
 import org.apache.catalina.tribes.transport.ReplicationTransmitter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -98,7 +81,16 @@
     private ChannelSender channelSender;
     private MembershipManager membershipManager;
     private RpcRequestHandler rpcRequestHandler;
-    private StaticMembershipInterceptor staticMembershipInterceptor;
+    private MembershipScheme membershipScheme;
+
+    /**
+     * The mode in which this member operates such as "loadBalance" or "application"
+     */
+    private Mode mode;
+
+    /**
+     * Static members
+     */
     private List<org.apache.axis2.clustering.Member> members;
 
     private final Map<String, LoadBalanceEventHandler> lbEventHandlers =
@@ -145,30 +137,30 @@
     public void init() throws ClusteringFault {
         log.info("Initializing cluster...");
         addRequestBlockingHandlerToInFlows();
-        membershipManager = new MembershipManager();
+        membershipManager = new MembershipManager(configurationContext);
+
         channel = new GroupChannel();
         channelSender = new ChannelSender(channel, membershipManager, synchronizeAllMembers());
         channelListener =
                 new ChannelListener(configurationContext, configurationManager, contextManager);
-
-        setMaximumRetries();
-
-        String membershipScheme = getMembershipScheme();
-        log.info("Using " + membershipScheme + " based membership management scheme");
+        channel.addChannelListener(channelListener);
 
         byte[] domain = getClusterDomain();
         log.info("Cluster domain: " + new String(domain));
+        membershipManager.setDomain(domain);
 
-        // Add all the ChannelInterceptors
-        addInterceptors(channel, domain, membershipScheme);
-
-        // Membership scheme handling
-        // If it is a WKA scheme, connect to a WKA and get a list of members. Add the members
-        // to the MembershipManager
-        configureMembershipScheme(domain, membershipScheme);
-
-        channel.addChannelListener(channelListener);
+        // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
+        // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+        rpcRequestHandler = new RpcRequestHandler(configurationContext, membershipManager);
+        rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+        if (log.isDebugEnabled()) {
+            log.debug("Created RPC Channel for domain " + new String(domain));
+        }
+        membershipManager.setRpcChannel(rpcChannel);
 
+        setMaximumRetries();
+        configureMode(domain);
+        configureMembershipScheme(domain, mode.getMembershipManagers());
         setMemberTransportInfo();
 
         TribesMembershipListener membershipListener = new TribesMembershipListener(membershipManager);
@@ -188,72 +180,10 @@
             throw new ClusteringFault(msg, e);
         }
 
-        // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
-        // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
-        rpcRequestHandler = new RpcRequestHandler(configurationContext,
-                                                  membershipManager,
-                                                  staticMembershipInterceptor);
-        rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
-        membershipManager.setRpcChannel(rpcChannel);
-
-
         log.info("Local Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers(membershipManager);
 
-        // If a WKA scheme is used, JOIN the group and get the member list
-        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)
-            && membershipManager.getMembers().length > 0) {
-            log.info("Sending JOIN message to WKA members...");
-            Member[] wkaMembers = membershipManager.getMembers(); // The well-known members
-            try {
-                Thread.sleep(3000); // Wait for sometime so that the WKA members can receive the MEMBER_LIST message, if they have just joined the group
-            } catch (InterruptedException ignored) {
-            }
-            Response[] responses = null;
-            do {
-                try {
-                    responses = rpcChannel.send(wkaMembers,
-                                                new JoinGroupCommand(),
-                                                RpcChannel.ALL_REPLY,
-                                                Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                                10000);
-                    if (responses.length == 0) {
-                        try {
-                            Thread.sleep(500);
-                        } catch (InterruptedException ignored) {
-                        }
-                    }
-                } catch (Exception e) {
-                    String msg = "Error occurred while trying to send JOIN request to WKA members";
-                    log.error(msg, e);
-                }
-
-                // TODO: If we do not get a response within some time, try to recover from this fault
-            }
-            while (responses == null || responses.length == 0);  // Wait until we've received at least one response
-
-            for (Response response : responses) {
-                MemberListCommand command = (MemberListCommand) response.getMessage();
-                command.setMembershipManager(membershipManager);
-                command.setStaticMembershipInterceptor(staticMembershipInterceptor);
-                command.setSender(response.getSource());
-                command.execute(configurationContext);
-            }
-
-            if (membershipManager.getMembers().length > 0) {
-                log.info("Sending MEMBER_JOINED to group...");
-                MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
-                memberJoinedCommand.setMember(membershipManager.getLocalMember());
-                try {
-                    rpcChannel.send(membershipManager.getMembers(), memberJoinedCommand,
-                                    RpcChannel.ALL_REPLY, Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
-                } catch (ChannelException e) {
-                    String msg = "Could not send MEMBER_JOINED message to group";
-                    log.error(msg, e);
-                    throw new ClusteringFault(msg, e);
-                }
-            }
-        }
+        membershipScheme.joinGroup();
 
         // If configuration management is enabled, get the latest config from a neighbour
         if (configurationManager != null) {
@@ -409,362 +339,42 @@
         }
     }
 
+    private void configureMode(byte[] domain) {
+        if (loadBalanceMode) {
+            mode = new LoadBalancerMode(domain, lbEventHandlers);
+        } else {
+            mode = new ApplicationMode(domain);
+        }
+        mode.init(channel);
+    }
+
     /**
      * Handle specific configurations related to different membership management schemes.
      *
-     * @param domain           The clustering loadBalancerDomain to which this member belongs to
-     * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
+     * @param domain             The clustering loadBalancerDomain to which this member belongs to
+     * @param membershipManagers MembershipManagers for different domains
      * @throws ClusteringFault If the membership scheme is invalid, or if an error occurs
      *                         while configuring membership scheme
      */
-    private void configureMembershipScheme(byte[] domain, String membershipScheme)
+    private void configureMembershipScheme(byte[] domain,
+                                           List<MembershipManager> membershipManagers)
             throws ClusteringFault {
-
-        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
-            configureWkaBasedMembership(domain);
-        } else if (membershipScheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
-            configureMulticastBasedMembership(channel, domain);
+        String scheme = getMembershipScheme();
+        log.info("Using " + scheme + " based membership management scheme");
+        if (scheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
+            membershipScheme = new WkaBasedMembershipScheme(channel, mode,
+                                                            membershipManagers,
+                                                            rpcChannel, membershipManager,
+                                                            parameters, domain, members);
+        } else if (scheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
+            membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters, domain);
         } else {
-            String msg = "Invalid membership scheme '" + membershipScheme +
+            String msg = "Invalid membership scheme '" + scheme +
                          "'. Supported schemes are multicast & wka";
             log.error(msg);
             throw new ClusteringFault(msg);
         }
-    }
-
-    /**
-     * Configure the membership related to the WKA based scheme
-     *
-     * @param domain The loadBalancerDomain to which the members belong to
-     * @throws ClusteringFault If an error occurs while configuring this scheme
-     */
-    private void configureWkaBasedMembership(byte[] domain) throws ClusteringFault {
-        channel.setMembershipService(new WkaMembershipService(membershipManager));
-        StaticMember localMember = new StaticMember();
-        membershipManager.setLocalMember(localMember);
-        ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
-
-        // ------------ START: Configure and add the local member ---------------------
-        Parameter localHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
-        String host;
-        if (localHost != null) {
-            host = ((String) localHost.getValue()).trim();
-        } else { // In cases where the localhost needs to be automatically figured out
-            try {
-                try {
-                    host = Utils.getIpAddress();
-                } catch (SocketException e) {
-                    String msg = "Could not get local IP address";
-                    log.error(msg, e);
-                    throw new ClusteringFault(msg, e);
-                }
-            } catch (Exception e) {
-                String msg = "Could not get the localhost name";
-                log.error(msg, e);
-                throw new ClusteringFault(msg, e);
-            }
-        }
-        receiver.setAddress(host);
-        try {
-            localMember.setHostname(host);
-        } catch (IOException e) {
-            String msg = "Could not set the local member's name";
-            log.error(msg, e);
-            throw new ClusteringFault(msg, e);
-        }
-
-        Parameter localPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
-        int port;
-        try {
-            if (localPort != null) {
-                port = Integer.parseInt(((String) localPort.getValue()).trim());
-                port = getLocalPort(new ServerSocket(), localMember.getHostname(), port, 4000, 100);
-            } else { // In cases where the localport needs to be automatically figured out
-                port = getLocalPort(new ServerSocket(), localMember.getHostname(), -1, 4000, 100);
-            }
-        } catch (IOException e) {
-            String msg =
-                    "Could not allocate the specified port or a port in the range 4000-4100 " +
-                    "for local host " + localMember.getHostname() +
-                    ". Check whether the IP address specified or inferred for the local " +
-                    "member is correct.";
-            log.error(msg, e);
-            throw new ClusteringFault(msg, e);
-        }
-
-        byte[] payload = "ping".getBytes();
-        localMember.setPayload(payload);
-        receiver.setPort(port);
-        localMember.setPort(port);
-        localMember.setDomain(domain);
-        staticMembershipInterceptor.setLocalMember(localMember);
-
-        // ------------ END: Configure and add the local member ---------------------
-
-        // ------------ START: Add other members ---------------------
-        for (org.apache.axis2.clustering.Member member : members) {
-            StaticMember tribesMember;
-            try {
-                tribesMember = new StaticMember(member.getHostName(), member.getPort(),
-                                                0, payload);
-            } catch (IOException e) {
-                String msg = "Could not add static member " +
-                             member.getHostName() + ":" + member.getPort();
-                log.error(msg, e);
-                throw new ClusteringFault(msg, e);
-            }
-
-            // Do not add the local member to the list of members
-            if (!(Arrays.equals(localMember.getHost(), tribesMember.getHost()) &&
-                  localMember.getPort() == tribesMember.getPort())) {
-                tribesMember.setDomain(domain);
-
-                // We will add the member even if it is offline at this moment. When the
-                // member comes online, it will be detected by the GMS
-                staticMembershipInterceptor.addStaticMember(tribesMember);
-                membershipManager.addWellKnownMember(tribesMember);
-                if (canConnect(member)) {
-                    membershipManager.memberAdded(tribesMember);
-                    log.info("Added static member " + TribesUtil.getName(tribesMember));
-                } else {
-                    log.info("Could not connect to member " + TribesUtil.getName(tribesMember));
-                }
-            }
-        }
-    }
-
-    /**
-     * Before adding a static member, we will try to verify whether we can connect to it
-     *
-     * @param member The member whose connectvity needs to be verified
-     * @return true, if the member can be contacted; false, otherwise.
-     */
-    private boolean canConnect(org.apache.axis2.clustering.Member member) {
-        for (int retries = 5; retries > 0; retries--) {
-            try {
-                InetAddress addr = InetAddress.getByName(member.getHostName());
-                SocketAddress sockaddr = new InetSocketAddress(addr,
-                                                               member.getPort());
-                new Socket().connect(sockaddr, 500);
-                return true;
-            } catch (IOException e) {
-                String msg = e.getMessage();
-                if (msg.indexOf("Connection refused") == -1 && msg.indexOf("connect timed out") == -1) {
-                    log.error("Cannot connect to member " +
-                              member.getHostName() + ":" + member.getPort(), e);
-                }
-            }
-        }
-        return false;
-    }
-
-    protected int getLocalPort(ServerSocket socket, String hostname,
-                               int preferredPort, int portstart, int retries) throws IOException {
-        if (preferredPort != -1) {
-            try {
-                return getLocalPort(socket, hostname, preferredPort);
-            } catch (IOException ignored) {
-                // Fall through and try a default port
-            }
-        }
-        InetSocketAddress addr = null;
-        if (retries > 0) {
-            try {
-                return getLocalPort(socket, hostname, portstart);
-            } catch (IOException x) {
-                retries--;
-                if (retries <= 0) {
-                    log.error("Unable to bind server socket to:" + addr + " throwing error.");
-                    throw x;
-                }
-                portstart++;
-                try {
-                    Thread.sleep(50);
-                } catch (InterruptedException ignored) {
-                    ignored.printStackTrace();
-                }
-                getLocalPort(socket, hostname, portstart, retries, -1);
-            }
-        }
-        return portstart;
-    }
-
-    private int getLocalPort(ServerSocket socket, String hostname, int port) throws IOException {
-        InetSocketAddress addr;
-        addr = new InetSocketAddress(hostname, port);
-        socket.bind(addr);
-        log.info("Receiver Server Socket bound to:" + addr);
-        socket.setSoTimeout(5);
-        socket.close();
-        try {
-            Thread.sleep(100);
-        } catch (InterruptedException ignored) {
-            ignored.printStackTrace();
-        }
-        return port;
-    }
-
-    /**
-     * Add ChannelInterceptors. The order of the interceptors that are added will depend on the
-     * membership management scheme
-     *
-     * @param channel          The Tribes channel
-     * @param domain           The loadBalancerDomain to which this node belongs to
-     * @param membershipScheme The membership scheme. Only wka & multicast are valid values.
-     * @throws ClusteringFault If an error occurs while adding interceptors
-     */
-    private void addInterceptors(ManagedChannel channel,
-                                 byte[] domain,
-                                 String membershipScheme) throws ClusteringFault {
-
-        if (log.isDebugEnabled()) {
-            log.debug("Adding Interceptors...");
-        }
-        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
-            TcpPingInterceptor tcpPingInterceptor = new TcpPingInterceptor();
-            tcpPingInterceptor.setInterval(100);
-            channel.addInterceptor(tcpPingInterceptor);
-            if (log.isDebugEnabled()) {
-                log.debug("Added TCP Ping Interceptor");
-            }
-        }
-
-        // Add the NonBlockingCoordinator. This is used for leader election
-        /*nbc = new NonBlockingCoordinator() {
-            public void fireInterceptorEvent(InterceptorEvent event) {
-                String status = event.getEventTypeDesc();
-                System.err.println("$$$$$$$$$$$$ NBC status=" + status);
-                int type = event.getEventType();
-            }
-        };
-        nbc.setPrevious(dfi);
-        channel.addInterceptor(nbc);*/
-
-        // Add a reliable failure detector
-        TcpFailureDetector tcpFailureDetector = new TcpFailureDetector();
-//        tcpFailureDetector.setPrevious(dfi); //TODO: check this
-//        tcpFailureDetector.setReadTestTimeout(30000);
-        tcpFailureDetector.setConnectTimeout(30000);
-        channel.addInterceptor(tcpFailureDetector);
-        if (log.isDebugEnabled()) {
-            log.debug("Added TCP Failure Detector");
-        }
-
-        // Add a DomainFilterInterceptor
-        channel.getMembershipService().setDomain(domain);
-        if (!loadBalanceMode) {
-            DomainFilterInterceptor dfi = new DomainFilterInterceptor();
-            dfi.setDomain(domain);
-            channel.addInterceptor(dfi);
-            if (log.isDebugEnabled()) {
-                log.debug("Added Domain Filter Interceptor");
-            }
-        } else {
-            LoadBalancerInterceptor lbInterceptor =
-                    new LoadBalancerInterceptor(domain, lbEventHandlers);
-            channel.addInterceptor(lbInterceptor);
-            if (log.isDebugEnabled()) {
-                log.debug("Added Load Balancer Interceptor");
-            }
-        }
-
-        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
-        atMostOnceInterceptor.setOptionFlag(TribesConstants.AT_MOST_ONCE_OPTION);
-        channel.addInterceptor(atMostOnceInterceptor);
-        if (log.isDebugEnabled()) {
-            log.debug("Added At-most-once Interceptor");
-        }
-
-        // Add the OrderInterceptor to preserve sender ordering
-        OrderInterceptor orderInterceptor = new OrderInterceptor();
-        orderInterceptor.setOptionFlag(TribesConstants.MSG_ORDER_OPTION);
-        channel.addInterceptor(orderInterceptor);
-        if (log.isDebugEnabled()) {
-            log.debug("Added Message Order Interceptor");
-        }
-
-        if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
-            staticMembershipInterceptor = new StaticMembershipInterceptor();
-            channel.addInterceptor(staticMembershipInterceptor);
-            if (log.isDebugEnabled()) {
-                log.debug("Added Static Membership Interceptor");
-            }
-        }
-    }
-
-    /**
-     * If a multicast based membership management scheme is used, configure the multicasting related
-     * parameters
-     *
-     * @param channel The Tribes channel
-     * @param domain  The clustering loadBalancerDomain to which this node belongs to
-     * @throws ClusteringFault If an error occurs while obtaining the local host address
-     */
-    private void configureMulticastBasedMembership(ManagedChannel channel,
-                                                   byte[] domain) throws ClusteringFault {
-        Properties mcastProps = channel.getMembershipService().getProperties();
-        Parameter mcastAddress = getParameter(TribesConstants.MCAST_ADDRESS);
-        if (mcastAddress != null) {
-            mcastProps.setProperty(TribesConstants.MCAST_ADDRESS,
-                                   ((String) mcastAddress.getValue()).trim());
-        }
-        Parameter mcastBindAddress = getParameter(TribesConstants.MCAST_BIND_ADDRESS);
-        if (mcastBindAddress != null) {
-            mcastProps.setProperty(TribesConstants.MCAST_BIND_ADDRESS,
-                                   ((String) mcastBindAddress.getValue()).trim());
-        }
-
-        Parameter mcastPort = getParameter(TribesConstants.MCAST_PORT);
-        if (mcastPort != null) {
-            mcastProps.setProperty(TribesConstants.MCAST_PORT,
-                                   ((String) mcastPort.getValue()).trim());
-        }
-        Parameter mcastFrequency = getParameter(TribesConstants.MCAST_FREQUENCY);
-        if (mcastFrequency != null) {
-            mcastProps.setProperty(TribesConstants.MCAST_FREQUENCY,
-                                   ((String) mcastFrequency.getValue()).trim());
-        }
-        Parameter mcastMemberDropTime = getParameter(TribesConstants.MEMBER_DROP_TIME);
-        if (mcastMemberDropTime != null) {
-            mcastProps.setProperty(TribesConstants.MEMBER_DROP_TIME,
-                                   ((String) mcastMemberDropTime.getValue()).trim());
-        }
-
-        // Set the IP address that will be advertised by this node
-        ReceiverBase receiver = (ReceiverBase) channel.getChannelReceiver();
-        Parameter tcpListenHost = getParameter(TribesConstants.LOCAL_MEMBER_HOST);
-        if (tcpListenHost != null) {
-            String host = ((String) tcpListenHost.getValue()).trim();
-            mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
-            mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
-            receiver.setAddress(host);
-        } else {
-            String host;
-            try {
-                host = Utils.getIpAddress();
-            } catch (SocketException e) {
-                String msg = "Could not get local IP address";
-                log.error(msg, e);
-                throw new ClusteringFault(msg, e);
-            }
-            mcastProps.setProperty(TribesConstants.TCP_LISTEN_HOST, host);
-            mcastProps.setProperty(TribesConstants.BIND_ADDRESS, host);
-            receiver.setAddress(host);
-        }
-        String localIP = System.getProperty(ClusteringConstants.LOCAL_IP_ADDRESS);
-        if (localIP != null) {
-            receiver.setAddress(localIP);
-        }
-
-        Parameter tcpListenPort = getParameter(TribesConstants.LOCAL_MEMBER_PORT);
-        if (tcpListenPort != null) {
-            String port = ((String) tcpListenPort.getValue()).trim();
-            mcastProps.setProperty(TribesConstants.TCP_LISTEN_PORT, port);
-            receiver.setPort(Integer.parseInt(port));
-        }
-
-        mcastProps.setProperty(TribesConstants.MCAST_CLUSTER_DOMAIN, new String(domain));
+        membershipScheme.init();
     }
 
     /**

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=671600&r1=671599&r2=671600&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Wed Jun 25 09:42:19 2008
@@ -24,8 +24,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-/**                                                                          In
- * 
+/**                                                                          
+ * Membership changes are notified using this class
  */
 public class TribesMembershipListener implements MembershipListener {
 
@@ -37,7 +37,6 @@
     }
 
     public void memberAdded(Member member) {
-
         if (membershipManager.memberAdded(member)) {
             log.info("New member " + TribesUtil.getName(member) + " joined cluster.");
         }