You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/21 20:02:54 UTC

svn commit: r658792 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/wka/JoinGroupCommand.java tribes/ControlCommandProcessor.java tribes/InitializationRequestHandler.java tribes/TribesClusterManager.java

Author: azeez
Date: Wed May 21 11:02:53 2008
New Revision: 658792

URL: http://svn.apache.org/viewvc?rev=658792&view=rev
Log:
Add a member to the group when it sends a JOIN message to a well-known member


Removed:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ControlCommandProcessor.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java?rev=658792&r1=658791&r2=658792&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java Wed May 21 11:02:53 2008
@@ -18,15 +18,17 @@
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  *
  */
 public class JoinGroupCommand extends ControlCommand {
-    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
-        //TODO: Method implementation
-        System.out.println("### EXEC JoinGroupCommand");
 
-        //todo: send member list, add to static member list, send member joined to others
+    private Log log = LogFactory.getLog(JoinGroupCommand.class);
+
+    public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+        log.info("JOIN request received");
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java?rev=658792&r1=658791&r2=658792&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/InitializationRequestHandler.java Wed May 21 11:02:53 2008
@@ -19,14 +19,20 @@
 
 package org.apache.axis2.clustering.tribes;
 
+import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.control.ControlCommand;
 import org.apache.axis2.clustering.control.GetConfigurationCommand;
+import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
+import org.apache.axis2.clustering.control.GetStateResponseCommand;
 import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcCallback;
+import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -38,35 +44,71 @@
 public class InitializationRequestHandler implements RpcCallback {
 
     private static Log log = LogFactory.getLog(InitializationRequestHandler.class);
-    private ControlCommandProcessor controlCommandProcessor;
-
+    private ConfigurationContext configurationContext;
+    private MembershipManager membershipManager;
+    private StaticMembershipInterceptor staticMembershipInterceptor;
+
+    public InitializationRequestHandler(ConfigurationContext configurationContext, MembershipManager membershipManager,
+                                        StaticMembershipInterceptor staticMembershipInterceptor) {
+        this.configurationContext = configurationContext;
+        this.membershipManager = membershipManager;
+        this.staticMembershipInterceptor = staticMembershipInterceptor;
+    }
 
-    public InitializationRequestHandler(ControlCommandProcessor controlCommandProcessor) {
-        this.controlCommandProcessor = controlCommandProcessor;
+    public void setConfigurationContext(ConfigurationContext configurationContext) {
+        this.configurationContext = configurationContext;
     }
 
     public Serializable replyRequest(Serializable msg, Member member) {
-        if (msg instanceof GetStateCommand ||
-            msg instanceof GetConfigurationCommand) {
+        if (msg instanceof GetStateCommand) {
+            // If a GetStateRequest is received by a node which has not yet initialized
+            // this node cannot send a response to the state requester. So we simply return.
+            if (configurationContext.
+                    getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+                return null;
+            }
             try {
                 log.info("Received " + msg + " initialization request message from " +
                          TribesUtil.getHost(member));
-                return controlCommandProcessor.process((ControlCommand) msg); // response is either GetConfigurationResponseCommand or GetStateResponseCommand
+                GetStateCommand command = (GetStateCommand) msg;
+                command.execute(configurationContext);
+                GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
+                getStateRespCmd.setCommands(((GetStateCommand) command).getCommands());
+                return getStateRespCmd;
             } catch (ClusteringFault e) {
                 String errMsg = "Cannot handle initialization request";
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
-        } else if (msg instanceof JoinGroupCommand) {
-            log.info("Received " + msg + " from " + TribesUtil.getHost(member));
-//            JoinGroupCommand command = (JoinGroupCommand) msg;
+        } else if (msg instanceof GetConfigurationCommand) {
+            // If a GetConfigurationCommand is received by a node which has not yet initialized
+            // this node cannot send a response to the state requester. So we simply return.
+            if (configurationContext.
+                    getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+                return null;
+            }
             try {
-                return controlCommandProcessor.process((ControlCommand) msg); // response is 
+                log.info("Received " + msg + " initialization request message from " +
+                         TribesUtil.getHost(member));
+                GetConfigurationCommand command = (GetConfigurationCommand) msg;
+                command.execute(configurationContext);
+                GetConfigurationResponseCommand
+                    getConfigRespCmd = new GetConfigurationResponseCommand();
+                getConfigRespCmd.setServiceGroups(command.getServiceGroupNames());
+                return getConfigRespCmd;
             } catch (ClusteringFault e) {
-                String errMsg = "Cannot handle JOIN request";
+                String errMsg = "Cannot handle initialization request";
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
+        } else if (msg instanceof JoinGroupCommand) {
+            log.info("Received JOIN message from " + TribesUtil.getHost(member));
+            staticMembershipInterceptor.memberAdded(member);
+            membershipManager.memberAdded(member);
+
+            MemberListCommand memListCmd = new MemberListCommand();
+            //todo populate list
+            return memListCmd;
         }
         return null;
     }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=658792&r1=658791&r2=658792&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Wed May 21 11:02:53 2008
@@ -89,16 +89,15 @@
     private ManagedChannel channel;
     private RpcChannel rpcChannel;
     private ConfigurationContext configurationContext;
-    private ControlCommandProcessor controlCmdProcessor;
     private ChannelListener channelListener;
     private ChannelSender channelSender;
     private MembershipManager membershipManager;
+    private InitializationRequestHandler initializationRequestHandler;
     private StaticMembershipInterceptor staticMembershipInterceptor;
     private org.apache.axis2.clustering.Member[] members;
 
     public TribesClusterManager() {
         parameters = new HashMap<String, Parameter>();
-        controlCmdProcessor = new ControlCommandProcessor(configurationContext);
     }
 
     public void setMembers(org.apache.axis2.clustering.Member[] members) {
@@ -164,9 +163,12 @@
 
         // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
         // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
+        initializationRequestHandler = new InitializationRequestHandler(configurationContext,
+                                                                        membershipManager,
+                                                                        staticMembershipInterceptor);
         rpcChannel =
                 new RpcChannel(domain, channel,
-                               new InitializationRequestHandler(controlCmdProcessor));
+                               initializationRequestHandler);
 
         log.info("Local Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers(membershipManager);
@@ -184,7 +186,7 @@
                 }
             } catch (ChannelException e) {
                 String msg = "Could not JOIN group";
-                log.error(e);
+                log.error(msg, e);
                 throw new ClusteringFault(msg, e);
             }
         }
@@ -522,6 +524,10 @@
         nbc.setPrevious(dfi);
         channel.addInterceptor(nbc);*/
 
+        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
+        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
+        channel.addInterceptor(atMostOnceInterceptor);
+        
         // Add the OrderInterceptor to preserve sender ordering
         OrderInterceptor orderInterceptor = new OrderInterceptor();
         orderInterceptor.setOptionFlag(MSG_ORDER_OPTION);
@@ -536,10 +542,6 @@
             staticMembershipInterceptor = new StaticMembershipInterceptor();
             channel.addInterceptor(staticMembershipInterceptor);
         }
-
-        // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
-        AtMostOnceInterceptor atMostOnceInterceptor = new AtMostOnceInterceptor();
-        channel.addInterceptor(atMostOnceInterceptor);
     }
 
     /**
@@ -740,7 +742,9 @@
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
-        controlCmdProcessor.setConfigurationContext(configurationContext);
+        if (initializationRequestHandler != null) {
+            initializationRequestHandler.setConfigurationContext(configurationContext);
+        }
         if (channelListener != null) {
             channelListener.setConfigurationContext(configurationContext);
         }