You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/25 06:59:16 UTC

svn commit: r659911 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes: MembershipManager.java RpcRequestHandler.java TribesClusterManager.java

Author: azeez
Date: Sat May 24 21:59:15 2008
New Revision: 659911

URL: http://svn.apache.org/viewvc?rev=659911&view=rev
Log:
When a response is received from a well-known member, we will the receiver will replace the details of the WKA member obtained from the configuration file with those sent by the WKA member


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=659911&r1=659910&r2=659911&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Sat May 24 21:59:15 2008
@@ -76,6 +76,10 @@
         wkaMembers.add(wkaMember);
     }
 
+    public void removeWellKnownMember(Member wkaMember) {
+        wkaMembers.remove(wkaMember);
+    }
+
     /**
      * A new member is added
      *

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=659911&r1=659910&r2=659911&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Sat May 24 21:59:15 2008
@@ -145,9 +145,7 @@
                 throw new RemoteProcessException(errMsg, e);
             }
         }
-
-        //TODO: If a WKA member fails, it shud figure out the membership. The WKA member write membership to a local file
-        return null;
+        return null;                      
     }
 
     public void leftOver(Serializable msg, Member member) {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=659911&r1=659910&r2=659911&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sat May 24 21:59:15 2008
@@ -178,37 +178,54 @@
         // If a WKA scheme is used, JOIN the group and get the member list
         if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             try {
-                Response[] responses = rpcChannel.send(membershipManager.getMembers(),
-                                                       new JoinGroupCommand(),
-                                                       RpcChannel.FIRST_REPLY,
-                                                       Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                                       10000);
-                if (responses.length > 0) {
-                    Member source = responses[0].getSource();
-                    MemberListCommand command = (MemberListCommand) responses[0].getMessage();
-                    command.setMembershipManager(membershipManager);
-                    command.setStaticMembershipInterceptor(staticMembershipInterceptor);
-                    command.execute(configurationContext);
-
-                    log.info("Sending MEMBER_JOINED to group...");
-                    MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
-                    memberJoinedCommand.setMember(membershipManager.getLocalMember());
-                    try {
-                        Member[] currentMembers = membershipManager.getMembers();
-                        Member[] sendTo = new Member[currentMembers.length - 1];
-                        int j = 0;
-                        for (Member currentMember : currentMembers) {
-                            if (!currentMember.equals(source)) {  // Don't send back to the sender
-                                sendTo[j] = currentMember;
-                                j++;
+                Member[] wkaMembers = membershipManager.getMembers(); // The well-known members
+                for (Member wkaMember : wkaMembers) {
+                    Response[] responses = rpcChannel.send(new Member[]{wkaMember},
+                                                           new JoinGroupCommand(),
+                                                           RpcChannel.FIRST_REPLY,
+                                                           Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                           10000);
+                    if (responses.length > 0) {
+                        Member source = responses[0].getSource();
+                        MemberListCommand command = (MemberListCommand) responses[0].getMessage();
+                        command.setMembershipManager(membershipManager);
+                        command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+                        command.execute(configurationContext);
+
+                        // Replace the WKA member, with the details received from that member
+                        // This is because we may have used a public IP address to point to the
+                        // wka member. But subequently, we need to use the private IP address
+                        StaticMember member = new StaticMember();
+                        member.setHost(source.getHost());
+                        member.setPort(source.getPort());
+                        member.setMemberAliveTime(source.getMemberAliveTime());
+                        member.setPayload("ping".getBytes());
+                        membershipManager.removeWellKnownMember(wkaMember);
+                        staticMembershipInterceptor.removeStaticMember(wkaMember);
+                        membershipManager.addWellKnownMember(member);
+                        membershipManager.memberAdded(member);
+                        staticMembershipInterceptor.memberAdded(member);
+
+                        log.info("Sending MEMBER_JOINED to group...");
+                        MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+                        memberJoinedCommand.setMember(membershipManager.getLocalMember());
+                        try {
+                            Member[] currentMembers = membershipManager.getMembers();
+                            Member[] sendTo = new Member[currentMembers.length - 1];
+                            int j = 0;
+                            for (Member currentMember : currentMembers) {
+                                if (!currentMember.equals(source)) {  // Don't send back to the sender
+                                    sendTo[j] = currentMember;
+                                    j++;
+                                }
                             }
+                            rpcChannel.send(sendTo, memberJoinedCommand, RpcChannel.ALL_REPLY,
+                                            Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+                        } catch (ChannelException e) {
+                            String msg = "Could not send MEMBER_JOINED message to group";
+                            log.error(msg, e);
+                            throw new ClusteringFault(msg, e);
                         }
-                        rpcChannel.send(sendTo, memberJoinedCommand, RpcChannel.ALL_REPLY,
-                                        Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
-                    } catch (ChannelException e) {
-                        String msg = "Could not send MEMBER_JOINED message to group";
-                        log.error(msg, e);
-                        throw new ClusteringFault(msg, e);
                     }
                 }
             } catch (ChannelException e) {