You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/21 23:20:49 UTC

svn commit: r658880 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/wka/ tribes/

Author: azeez
Date: Wed May 21 14:20:48 2008
New Revision: 658880

URL: http://svn.apache.org/viewvc?rev=658880&view=rev
Log:
When a new member joins by notifying a WKA member, this WKA member should notify the other members


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
      - copied, changed from r658777, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
Removed:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java Wed May 21 14:20:48 2008
@@ -15,10 +15,16 @@
  */
 package org.apache.axis2.clustering.control.wka;
 
-import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.context.ConfigurationContext;
 
 /**
  *
  */
-public class LeaveGroupCommand extends ChannelData {
+public class LeaveGroupCommand extends ControlCommand {
+    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        //TODO: Method implementation
+
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Wed May 21 14:20:48 2008
@@ -15,10 +15,46 @@
  */
 package org.apache.axis2.clustering.control.wka;
 
-import org.apache.catalina.tribes.io.ChannelData;
+import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.tribes.MembershipManager;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Arrays;
 
 /**
  *
  */
-public class MemberJoinedCommand extends ChannelData {
+public class MemberJoinedCommand extends ControlCommand {
+
+    private static final Log log = LogFactory.getLog(MemberJoinedCommand.class);
+    private Member member;
+    private MembershipManager membershipManager;
+    private StaticMembershipInterceptor staticMembershipInterceptor;
+
+    public void setMembershipManager(MembershipManager membershipManager) {
+        this.membershipManager = membershipManager;
+    }
+
+    public void setStaticMembershipInterceptor(
+            StaticMembershipInterceptor staticMembershipInterceptor) {
+        this.staticMembershipInterceptor = staticMembershipInterceptor;
+    }
+
+    public void setMember(Member member) {
+        this.member = member;
+    }
+
+    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        Member localMember = membershipManager.getLocalMember();
+        if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
+              localMember.getPort() == member.getPort())) {
+            membershipManager.memberAdded(member);
+            staticMembershipInterceptor.memberAdded(member);
+        }
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Wed May 21 14:20:48 2008
@@ -15,16 +15,45 @@
  */
 package org.apache.axis2.clustering.control.wka;
 
-import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
+import org.apache.axis2.clustering.tribes.MembershipManager;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+
+import java.util.Arrays;
 
 /**
  *
  */
 public class MemberListCommand extends ControlCommand {
+
+    private Member[] members;
+    private MembershipManager membershipManager;
+    private StaticMembershipInterceptor staticMembershipInterceptor;
+
+    public void setMembershipManager(MembershipManager membershipManager) {
+        this.membershipManager = membershipManager;
+    }
+
+    public void setStaticMembershipInterceptor(
+            StaticMembershipInterceptor staticMembershipInterceptor) {
+        this.staticMembershipInterceptor = staticMembershipInterceptor;
+    }
+
+    public void setMembers(Member[] members) {
+        this.members = members;
+    }
+
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
-        //TODO: Method implementation
-        System.out.println("#### MEMBER LIST CMD");
+        for (Member member : members) {
+            Member localMember = membershipManager.getLocalMember();
+            if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
+                  localMember.getPort() == member.getPort())) {
+                membershipManager.memberAdded(member);
+                staticMembershipInterceptor.memberAdded(member);
+            }
+        }
     }
 }

Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (from r658777, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java&r1=658777&r2=658880&rev=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Wed May 21 14:20:48 2008
@@ -19,55 +19,137 @@
 
 package org.apache.axis2.clustering.tribes;
 
+import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
+import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.RpcChannel;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.io.Serializable;
 
 /**
- * Handles initialization requests(GetConfiguration & GetState) from newly joining members
+ * Handles RPC Channel requests from members
  */
-public class InitializationRequestHandler implements RpcCallback {
+public class RpcRequestHandler implements RpcCallback {
 
-    private static Log log = LogFactory.getLog(InitializationRequestHandler.class);
-    private ControlCommandProcessor controlCommandProcessor;
+    private static Log log = LogFactory.getLog(RpcRequestHandler.class);
+    private ConfigurationContext configurationContext;
+    private MembershipManager membershipManager;
+    private StaticMembershipInterceptor staticMembershipInterceptor;
+    private RpcChannel rpcChannel;
+
+    public RpcRequestHandler(ConfigurationContext configurationContext,
+                                        MembershipManager membershipManager,
+                                        StaticMembershipInterceptor staticMembershipInterceptor) {
+        this.configurationContext = configurationContext;
+        this.membershipManager = membershipManager;
+        this.staticMembershipInterceptor = staticMembershipInterceptor;
+    }
 
+    public void setRpcChannel(RpcChannel rpcChannel) {
+        this.rpcChannel = rpcChannel;
+    }
 
-    public InitializationRequestHandler(ControlCommandProcessor controlCommandProcessor) {
-        this.controlCommandProcessor = controlCommandProcessor;
+    public void setConfigurationContext(ConfigurationContext configurationContext) {
+        this.configurationContext = configurationContext;
     }
 
     public Serializable replyRequest(Serializable msg, Member member) {
-        if (msg instanceof GetStateCommand ||
-            msg instanceof GetConfigurationCommand) {
+        if (msg instanceof GetStateCommand) {
+            // If a GetStateRequest is received by a node which has not yet initialized
+            // this node cannot send a response to the state requester. So we simply return.
+            if (configurationContext.
+                    getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+                return null;
+            }
             try {
                 log.info("Received " + msg + " initialization request message from " +
                          TribesUtil.getHost(member));
-                return controlCommandProcessor.process((ControlCommand) msg); // response is either GetConfigurationResponseCommand or GetStateResponseCommand
+                GetStateCommand command = (GetStateCommand) msg;
+                command.execute(configurationContext);
+                GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
+                getStateRespCmd.setCommands(command.getCommands());
+                return getStateRespCmd;
             } catch (ClusteringFault e) {
                 String errMsg = "Cannot handle initialization request";
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
-        } else if (msg instanceof JoinGroupCommand) {
-            log.info("Received " + msg + " from " + TribesUtil.getHost(member));
-//            JoinGroupCommand command = (JoinGroupCommand) msg;
+        } else if (msg instanceof GetConfigurationCommand) {
+            // If a GetConfigurationCommand is received by a node which has not yet initialized
+            // this node cannot send a response to the state requester. So we simply return.
+            if (configurationContext.
+                    getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+                return null;
+            }
             try {
-                return controlCommandProcessor.process((ControlCommand) msg); // response is 
+                log.info("Received " + msg + " initialization request message from " +
+                         TribesUtil.getHost(member));
+                GetConfigurationCommand command = (GetConfigurationCommand) msg;
+                command.execute(configurationContext);
+                GetConfigurationResponseCommand
+                        getConfigRespCmd = new GetConfigurationResponseCommand();
+                getConfigRespCmd.setServiceGroups(command.getServiceGroupNames());
+                return getConfigRespCmd;
             } catch (ClusteringFault e) {
+                String errMsg = "Cannot handle initialization request";
+                log.error(errMsg, e);
+                throw new RemoteProcessException(errMsg, e);
+            }
+        } else if (msg instanceof JoinGroupCommand) {
+            log.info("Received JOIN message from " + TribesUtil.getHost(member));
+            MemberListCommand memListCmd;
+            try {
+                // Add the member
+                staticMembershipInterceptor.memberAdded(member);
+                membershipManager.memberAdded(member);
+
+                // Return the list of current members to the caller
+                memListCmd = new MemberListCommand();
+                memListCmd.setMembers(membershipManager.getMembers());
+
+                // Send a message to all other informing that a member has joined
+                MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+                memberJoinedCommand.setMember(member);
+                rpcChannel.send(membershipManager.getMembers(),
+                                memberJoinedCommand,
+                                RpcChannel.ALL_REPLY,
+                                Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                10000);
+            } catch (Exception e) {
                 String errMsg = "Cannot handle JOIN request";
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
+            return memListCmd;
+        } else if (msg instanceof MemberJoinedCommand) {
+            log.info("Received MEMBER_JOINED message from " + TribesUtil.getHost(member));
+            try {
+                MemberJoinedCommand command = (MemberJoinedCommand) msg;
+                command.setMembershipManager(membershipManager);
+                command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+                command.execute(configurationContext);
+            } catch (ClusteringFault e) {
+                String errMsg = "Cannot handle MEMBER_JOINED notification";
+                log.error(errMsg, e);
+                throw new RemoteProcessException(errMsg, e);
+            }
         }
+
+        //TODO: If a WKA member fails, it shud figure out the membership. The WKA member write membership to a local file
         return null;
     }
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=658880&r1=658879&r2=658880&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 21 14:20:48 2008
@@ -34,6 +34,7 @@
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.description.HandlerDescription;
 import org.apache.axis2.description.Parameter;
@@ -92,7 +93,7 @@
     private ChannelListener channelListener;
     private ChannelSender channelSender;
     private MembershipManager membershipManager;
-    private InitializationRequestHandler initializationRequestHandler;
+    private RpcRequestHandler rpcRequestHandler;
     private StaticMembershipInterceptor staticMembershipInterceptor;
     private org.apache.axis2.clustering.Member[] members;
 
@@ -163,12 +164,12 @@
 
         // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
         // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
-        initializationRequestHandler = new InitializationRequestHandler(configurationContext,
-                                                                        membershipManager,
-                                                                        staticMembershipInterceptor);
-        rpcChannel =
-                new RpcChannel(domain, channel,
-                               initializationRequestHandler);
+        rpcRequestHandler = new RpcRequestHandler(configurationContext,
+                                                  membershipManager,
+                                                  staticMembershipInterceptor);
+        rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+        rpcRequestHandler.setRpcChannel(rpcChannel);
+
 
         log.info("Local Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers(membershipManager);
@@ -182,7 +183,10 @@
                                                        Channel.SEND_OPTIONS_ASYNCHRONOUS,
                                                        10000);
                 if (responses.length > 0) {
-                    ((ControlCommand) responses[0].getMessage()).execute(configurationContext); // Do the initialization
+                    MemberListCommand command = (MemberListCommand) responses[0].getMessage();
+                    command.setMembershipManager(membershipManager);
+                    command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+                    command.execute(configurationContext); // Do the initialization
                 }
             } catch (ChannelException e) {
                 String msg = "Could not JOIN group";
@@ -527,7 +531,7 @@
         // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
         AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
         channel.addInterceptor(atMostOnceInterceptor);
-        
+
         // Add the OrderInterceptor to preserve sender ordering
         OrderInterceptor orderInterceptor = new OrderInterceptor();
         orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
@@ -742,8 +746,8 @@
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
-        if (initializationRequestHandler != null) {
-            initializationRequestHandler.setConfigurationContext(configurationContext);
+        if (rpcRequestHandler != null) {
+            rpcRequestHandler.setConfigurationContext(configurationContext);
         }
         if (channelListener != null) {
             channelListener.setConfigurationContext(configurationContext);