You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/23 08:23:45 UTC

svn commit: r659446 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/wka/ tribes/

Author: azeez
Date: Thu May 22 23:23:42 2008
New Revision: 659446

URL: http://svn.apache.org/viewvc?rev=659446&view=rev
Log:
Improvemements to WKA memebership scheme
1. When a new member joins, a MEMBER_LIST message is sent to it
2. A new member will advertise its newly received group membership to all group memeber


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java Thu May 22 23:23:42 2008
@@ -15,14 +15,15 @@
  */
 package org.apache.axis2.clustering.control.wka;
 
-import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
- *
+ * This is the message a member will send to another member when it intends to join a group.
+ * This is used with WKA based membership
  */
 public class JoinGroupCommand extends ControlCommand {
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java Thu May 22 23:23:42 2008
@@ -20,7 +20,7 @@
 import org.apache.axis2.context.ConfigurationContext;
 
 /**
- *
+ *  When a member wishes to gracefully leave a group, it will send this message to the group
  */
 public class LeaveGroupCommand extends ControlCommand {
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Thu May 22 23:23:42 2008
@@ -21,17 +21,16 @@
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 import java.util.Arrays;
 
 /**
- *
+ * This is the notification message a member will send to all others in the group after it has
+ * joined the group. When the other members received this message, they will add the newly joined
+ * member to their member list
  */
 public class MemberJoinedCommand extends ControlCommand {
 
-    private static final Log log = LogFactory.getLog(MemberJoinedCommand.class);
     private Member member;
     private MembershipManager membershipManager;
     private StaticMembershipInterceptor staticMembershipInterceptor;

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Thu May 22 23:23:42 2008
@@ -18,17 +18,24 @@
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.tribes.MembershipManager;
+import org.apache.axis2.clustering.tribes.TribesUtil;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.util.Arrays;
 
 /**
- *
+ * When a new member wishes to join a group, it will send a {@link JoinGroupCommand} message to
+ * a known member. Then this known member will respond with this MemberListCommand message.
+ * This message will contain a list of all current members.
  */
 public class MemberListCommand extends ControlCommand {
 
+    private static final Log log = LogFactory.getLog(MemberListCommand.class);
+
     private Member[] members;
     private MembershipManager membershipManager;
     private StaticMembershipInterceptor staticMembershipInterceptor;
@@ -37,8 +44,7 @@
         this.membershipManager = membershipManager;
     }
 
-    public void setStaticMembershipInterceptor(
-            StaticMembershipInterceptor staticMembershipInterceptor) {
+    public void setStaticMembershipInterceptor(StaticMembershipInterceptor staticMembershipInterceptor) {
         this.staticMembershipInterceptor = staticMembershipInterceptor;
     }
 
@@ -47,10 +53,12 @@
     }
 
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        log.info("Received MEMBER_LIST message");
         for (Member member : members) {
             Member localMember = membershipManager.getLocalMember();
             if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
                   localMember.getPort() == member.getPort())) {
+                log.info("Added member " + TribesUtil.getHost(member));
                 membershipManager.memberAdded(member);
                 staticMembershipInterceptor.memberAdded(member);
             }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Thu May 22 23:23:42 2008
@@ -29,11 +29,9 @@
 import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
 import org.apache.axis2.clustering.control.wka.MemberListCommand;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -49,7 +47,6 @@
     private ConfigurationContext configurationContext;
     private MembershipManager membershipManager;
     private StaticMembershipInterceptor staticMembershipInterceptor;
-    private RpcChannel rpcChannel;
 
     public RpcRequestHandler(ConfigurationContext configurationContext,
                                         MembershipManager membershipManager,
@@ -59,15 +56,11 @@
         this.staticMembershipInterceptor = staticMembershipInterceptor;
     }
 
-    public void setRpcChannel(RpcChannel rpcChannel) {
-        this.rpcChannel = rpcChannel;
-    }
-
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
     }
 
-    public Serializable replyRequest(Serializable msg, Member member) {
+    public Serializable replyRequest(Serializable msg, Member invoker) {
         if (msg instanceof GetStateCommand) {
             // If a GetStateRequest is received by a node which has not yet initialized
             // this node cannot send a response to the state requester. So we simply return.
@@ -77,7 +70,7 @@
             }
             try {
                 log.info("Received " + msg + " initialization request message from " +
-                         TribesUtil.getHost(member));
+                         TribesUtil.getHost(invoker));
                 GetStateCommand command = (GetStateCommand) msg;
                 command.execute(configurationContext);
                 GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
@@ -97,7 +90,7 @@
             }
             try {
                 log.info("Received " + msg + " initialization request message from " +
-                         TribesUtil.getHost(member));
+                         TribesUtil.getHost(invoker));
                 GetConfigurationCommand command = (GetConfigurationCommand) msg;
                 command.execute(configurationContext);
                 GetConfigurationResponseCommand
@@ -110,25 +103,16 @@
                 throw new RemoteProcessException(errMsg, e);
             }
         } else if (msg instanceof JoinGroupCommand) {
-            log.info("Received JOIN message from " + TribesUtil.getHost(member));
+            log.info("Received JOIN message from " + TribesUtil.getHost(invoker));
             MemberListCommand memListCmd;
             try {
                 // Add the member
-                staticMembershipInterceptor.memberAdded(member);
-                membershipManager.memberAdded(member);
+                staticMembershipInterceptor.memberAdded(invoker);
+                membershipManager.memberAdded(invoker);
 
                 // Return the list of current members to the caller
                 memListCmd = new MemberListCommand();
                 memListCmd.setMembers(membershipManager.getMembers());
-
-                // Send a message to all other informing that a member has joined
-                MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
-                memberJoinedCommand.setMember(member);
-                rpcChannel.send(membershipManager.getMembers(),
-                                memberJoinedCommand,
-                                RpcChannel.ALL_REPLY,
-                                Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                10000);
             } catch (Exception e) {
                 String errMsg = "Cannot handle JOIN request";
                 log.error(errMsg, e);
@@ -136,7 +120,7 @@
             }
             return memListCmd;
         } else if (msg instanceof MemberJoinedCommand) {
-            log.info("Received MEMBER_JOINED message from " + TribesUtil.getHost(member));
+            log.info("Received MEMBER_JOINED message from " + TribesUtil.getHost(invoker));
             try {
                 MemberJoinedCommand command = (MemberJoinedCommand) msg;
                 command.setMembershipManager(membershipManager);

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Thu May 22 23:23:42 2008
@@ -34,6 +34,7 @@
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
 import org.apache.axis2.clustering.control.wka.MemberListCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.HandlerDescription;
@@ -168,7 +169,6 @@
                                                   membershipManager,
                                                   staticMembershipInterceptor);
         rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
-        rpcRequestHandler.setRpcChannel(rpcChannel);
 
 
         log.info("Local Member " + TribesUtil.getLocalHost(channel));
@@ -183,10 +183,35 @@
                                                        Channel.SEND_OPTIONS_ASYNCHRONOUS,
                                                        10000);
                 if (responses.length > 0) {
+                    Member source = responses[0].getSource();
                     MemberListCommand command = (MemberListCommand) responses[0].getMessage();
                     command.setMembershipManager(membershipManager);
                     command.setStaticMembershipInterceptor(staticMembershipInterceptor);
-                    command.execute(configurationContext); // Do the initialization
+                    command.execute(configurationContext);
+
+                    log.info("Sending MEMBER_JOINED to group...");
+                    MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+                    memberJoinedCommand.setMember(membershipManager.getLocalMember());
+                    try {
+                        Member[] currentMembers = membershipManager.getMembers();
+                        Member[] sendTo = new Member[currentMembers.length - 1];
+                        int j = 0;
+                        for (Member currentMember : currentMembers) {
+                            if (!currentMember.equals(source)) {
+                                sendTo[j] = currentMember;
+                                j++;
+                            }
+                        }
+                        rpcChannel.send(sendTo,
+                                        memberJoinedCommand,
+                                        RpcChannel.ALL_REPLY,
+                                        Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                        10000);
+                    } catch (ChannelException e) {
+                        String msg = "Could not send MEMBER_JOINED message to group";
+                        log.error(msg, e);
+                        throw new ClusteringFault(msg, e);
+                    }
                 }
             } catch (ChannelException e) {
                 String msg = "Could not JOIN group";