You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/23 12:31:55 UTC

svn commit: r659492 - in /webservices/axis2/trunk/java/modules: clustering/src/org/apache/axis2/clustering/tribes/ kernel/src/org/apache/axis2/clustering/ kernel/src/org/apache/axis2/deployment/

Author: azeez
Date: Fri May 23 03:31:54 2008
New Revision: 659492

URL: http://svn.apache.org/viewvc?rev=659492&view=rev
Log:
When a WKA member fails and comes back up, other members will have to send the MEMBER_LIST to it


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java
    webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri May 23 03:31:54 2008
@@ -80,8 +80,7 @@
             } catch (ChannelException e) {
                 log.error("Could not send message to some members", e);
                 ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
-                for (int i = 0; i < faultyMembers.length; i++) {
-                    ChannelException.FaultyMember faultyMember = faultyMembers[i];
+                for (ChannelException.FaultyMember faultyMember : faultyMembers) {
                     Member member = faultyMember.getMember();
                     log.error("Member " + TribesUtil.getHost(member) + " is faulty",
                               faultyMember.getCause());

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Fri May 23 03:31:54 2008
@@ -19,8 +19,14 @@
 
 package org.apache.axis2.clustering.tribes;
 
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -30,12 +36,30 @@
  * Responsible for managing the membership. Handles membership changes.
  */
 public class MembershipManager {
+
+    private static final Log log = LogFactory.getLog(MembershipManager.class);
+
+    private RpcChannel rpcChannel;
+
+    public MembershipManager() {
+    }
+
+    public void setRpcChannel(RpcChannel rpcChannel) {
+        this.rpcChannel = rpcChannel;
+    }
+
     /**
-     * List of members in the cluster
+     * List of current members in the cluster. Only the members who are alive will be in this
+     * list
      */
     private final List<Member> members = new ArrayList<Member>();
 
     /**
+     * List of Well-Known members. These members may or may not be alive at a given moment.
+     */
+    private List<Member> wkaMembers = new ArrayList<Member>();
+
+    /**
      * The member representing this node
      */
     private Member localMember;
@@ -48,6 +72,10 @@
         this.localMember = localMember;
     }
 
+    public void addWellKnownMember(Member wkaMember) {
+        wkaMembers.add(wkaMember);
+    }
+
     /**
      * A new member is added
      *
@@ -56,6 +84,24 @@
      */
     public synchronized boolean memberAdded(Member member) {
         if (!members.contains(member)) {
+            if (rpcChannel != null && wkaMembers.contains(member)) { // if it is a well-known member
+
+                log.info("A WKA member " + TribesUtil.getHost(member) +
+                         " just joined the group. Sending MEMBER_LIST message.");
+                // send the memeber list to it
+                MemberListCommand memListCmd;
+                try {
+                    memListCmd = new MemberListCommand();
+                    memListCmd.setMembers(getMembers());
+                    rpcChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
+                                    Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+                } catch (Exception e) {
+                    String errMsg = "Could not send MEMBER_LIST to well-known member " +
+                                    TribesUtil.getHost(member);
+                    log.error(errMsg, e);
+                    throw new RemoteProcessException(errMsg, e);
+                }
+            }
             members.add(member);
             return true;
         }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Fri May 23 03:31:54 2008
@@ -49,8 +49,8 @@
     private StaticMembershipInterceptor staticMembershipInterceptor;
 
     public RpcRequestHandler(ConfigurationContext configurationContext,
-                                        MembershipManager membershipManager,
-                                        StaticMembershipInterceptor staticMembershipInterceptor) {
+                             MembershipManager membershipManager,
+                             StaticMembershipInterceptor staticMembershipInterceptor) {
         this.configurationContext = configurationContext;
         this.membershipManager = membershipManager;
         this.staticMembershipInterceptor = staticMembershipInterceptor;
@@ -131,6 +131,19 @@
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
+        } else if (msg instanceof MemberListCommand) {
+            try {                    //TODO: What if we receive more than one member list message?
+                MemberListCommand command = (MemberListCommand) msg;
+                command.setMembershipManager(membershipManager);
+                command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+                command.execute(configurationContext);
+
+                //TODO Send MEMBER_JOINED messages to all nodes
+            } catch (ClusteringFault e) {
+                String errMsg = "Cannot handle MEMBER_LIST message";
+                log.error(errMsg, e);
+                throw new RemoteProcessException(errMsg, e);
+            }
         }
 
         //TODO: If a WKA member fails, it shud figure out the membership. The WKA member write membership to a local file

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri May 23 03:31:54 2008
@@ -96,17 +96,17 @@
     private MembershipManager membershipManager;
     private RpcRequestHandler rpcRequestHandler;
     private StaticMembershipInterceptor staticMembershipInterceptor;
-    private org.apache.axis2.clustering.Member[] members;
+    private List<org.apache.axis2.clustering.Member> members;
 
     public TribesClusterManager() {
         parameters = new HashMap<String, Parameter>();
     }
 
-    public void setMembers(org.apache.axis2.clustering.Member[] members) {
+    public void setMembers(List<org.apache.axis2.clustering.Member> members) {
         this.members = members;
     }
 
-    public org.apache.axis2.clustering.Member[] getMembers() {
+    public List<org.apache.axis2.clustering.Member> getMembers() {
         return members;
     }
 
@@ -140,8 +140,8 @@
         addInterceptors(channel, domain, membershipScheme);
 
         // Membership scheme handling
-        //TODO: if it is a WKA scheme, connect to a WKA and get a list of members. Add the members
-        // TODO: to the membership manager
+        // If it is a WKA scheme, connect to a WKA and get a list of members. Add the members
+        // to the membership manager
         configureMembershipScheme(domain, membershipScheme);
 
         channel.addChannelListener(channelListener);
@@ -169,6 +169,7 @@
                                                   membershipManager,
                                                   staticMembershipInterceptor);
         rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+        membershipManager.setRpcChannel(rpcChannel);
 
 
         log.info("Local Member " + TribesUtil.getLocalHost(channel));
@@ -197,16 +198,13 @@
                         Member[] sendTo = new Member[currentMembers.length - 1];
                         int j = 0;
                         for (Member currentMember : currentMembers) {
-                            if (!currentMember.equals(source)) {
+                            if (!currentMember.equals(source)) {  // Don't send back to the sender
                                 sendTo[j] = currentMember;
                                 j++;
                             }
                         }
-                        rpcChannel.send(sendTo,
-                                        memberJoinedCommand,
-                                        RpcChannel.ALL_REPLY,
-                                        Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                        10000);
+                        rpcChannel.send(sendTo, memberJoinedCommand, RpcChannel.ALL_REPLY,
+                                        Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
                     } catch (ChannelException e) {
                         String msg = "Could not send MEMBER_JOINED message to group";
                         log.error(msg, e);
@@ -450,6 +448,7 @@
                 // We will add the member even if it is offline at this moment. When the
                 // member comes online, it will be detected by the GMS
                 staticMembershipInterceptor.addStaticMember(tribesMember);
+                membershipManager.addWellKnownMember(tribesMember);
                 if (canConnect(member)) {
                     membershipManager.memberAdded(tribesMember);
                     log.info("Added static member " + TribesUtil.getHost(tribesMember));

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Fri May 23 03:31:54 2008
@@ -24,8 +24,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.StringTokenizer;
-
 public class TribesUtil {
 
     private static Log log = LogFactory.getLog(TribesUtil.class);

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri May 23 03:31:54 2008
@@ -24,6 +24,8 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.ParameterInclude;
 
+import java.util.List;
+
 /**
  * <p>
  * This is the main interface in the Axis2 clustering implementation.
@@ -135,7 +137,7 @@
      *
      * @param members Members to be added
      */
-    void setMembers(Member[] members);
+    void setMembers(List<Member> members);
 
     /**
      * Get the list of members in a
@@ -146,6 +148,6 @@
      * @return The members if static group membership is used. If any other membership scheme is used,
      *         the values returned may not be valid
      */
-    Member[] getMembers();
+    List<Member> getMembers();
 
 }
\ No newline at end of file

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java Fri May 23 03:31:54 2008
@@ -42,4 +42,25 @@
     public int getPort() {
         return port;
     }
+
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Member member = (Member) o;
+        return port == member.getPort() &&
+               !(hostName != null ? !hostName.equals(member.getHostName()) :
+                 member.getHostName() != null);
+    }
+
+    public int hashCode() {
+        int result;
+        result = (hostName != null ? hostName.hashCode() : 0);
+        result = 31 * result + port;
+        return result;
+    }
 }

Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java Fri May 23 03:31:54 2008
@@ -118,12 +118,12 @@
     }
 
     private void loadMembers(ClusterManager clusterManager, OMElement clusterElement) {
-        clusterManager.setMembers(new Member[0]);
+        clusterManager.setMembers(new ArrayList<Member>());
         Parameter membershipSchemeParam = clusterManager.getParameter("membershipScheme");
         if (membershipSchemeParam != null) {
             String membershipScheme = ((String) membershipSchemeParam.getValue()).trim();
             if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
-                List members = new ArrayList();
+                List<Member> members = new ArrayList<Member>();
                 OMElement membersEle =
                         clusterElement.getFirstChildWithName(new QName("members"));
                 if (membersEle != null) {
@@ -137,7 +137,7 @@
                         members.add(new Member(hostName, Integer.parseInt(port)));
                     }
                 }
-                clusterManager.setMembers((Member[]) members.toArray(new Member[members.size()]));
+                clusterManager.setMembers(members);
             }
         }
     }