You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/30 13:13:47 UTC

svn commit: r672741 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/ control/wka/ tribes/

Author: azeez
Date: Mon Jun 30 04:13:47 2008
New Revision: 672741

URL: http://svn.apache.org/viewvc?rev=672741&view=rev
Log:
Introducing separate RPC channel for membership messages (when WKA membershi is used) & init messages


Added:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
      - copied, changed from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java
      - copied, changed from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
Removed:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Mon Jun 30 04:13:47 2008
@@ -48,12 +48,12 @@
         ContextManager contextManager = clusterManager.getContextManager();
         if (contextManager != null) {
             Map excludedPropPatterns = contextManager.getReplicationExcludePatterns();
-            List cmdList = new ArrayList();
+            List<ContextClusteringCommand> cmdList = new ArrayList<ContextClusteringCommand>();
 
             // Add the service group contexts, service contexts & their respective properties
             String[] sgCtxIDs = configCtx.getServiceGroupContextIDs();
-            for (int i  = 0; i < sgCtxIDs.length; i ++) {
-                ServiceGroupContext sgCtx = configCtx.getServiceGroupContext(sgCtxIDs[i]);
+            for (String sgCtxID : sgCtxIDs) {
+                ServiceGroupContext sgCtx = configCtx.getServiceGroupContext(sgCtxID);
                 ContextClusteringCommand updateServiceGroupCtxCmd =
                         ContextClusteringCommandFactory.getUpdateCommand(sgCtx,
                                                                          excludedPropPatterns,
@@ -83,8 +83,7 @@
                 cmdList.add(updateCmd);
             }
             if (!cmdList.isEmpty()) {
-                commands = (ContextClusteringCommand[]) cmdList.
-                        toArray(new ContextClusteringCommand[cmdList.size()]);
+                commands = cmdList.toArray(new ContextClusteringCommand[cmdList.size()]);
             }
         }
     }

Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java (from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java&r1=671600&r2=672741&rev=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java Mon Jun 30 04:13:47 2008
@@ -17,18 +17,11 @@
  * under the License.
  */
 
-package org.apache.axis2.clustering.tribes;
+package org.apache.axis2.clustering.control.wka;
 
-import org.apache.axis2.clustering.ClusteringConstants;
 import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.control.GetConfigurationCommand;
-import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
-import org.apache.axis2.clustering.control.GetStateCommand;
-import org.apache.axis2.clustering.control.GetStateResponseCommand;
-import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
-import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
-import org.apache.axis2.clustering.control.wka.MemberListCommand;
-import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.clustering.tribes.MembershipManager;
+import org.apache.axis2.clustering.tribes.TribesUtil;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.RpcCallback;
@@ -38,80 +31,41 @@
 import java.io.Serializable;
 
 /**
- * Handles RPC Channel requests from members
+ * Handles RPC membership requests from members. This is used only in conjunction with WKA based
+ * membership mamangement
  */
-public class RpcRequestHandler implements RpcCallback {
+public class RpcMembershipRequestHandler implements RpcCallback {
 
-    private static Log log = LogFactory.getLog(RpcRequestHandler.class);
-    private ConfigurationContext configurationContext;
+    private static Log log = LogFactory.getLog(RpcMembershipRequestHandler.class);
     private MembershipManager membershipManager;
 
-    public RpcRequestHandler(ConfigurationContext configurationContext,
-                             MembershipManager membershipManager) {
-        this.configurationContext = configurationContext;
+    public RpcMembershipRequestHandler(MembershipManager membershipManager) {
         this.membershipManager = membershipManager;
     }
 
-    public void setConfigurationContext(ConfigurationContext configurationContext) {
-        this.configurationContext = configurationContext;
-    }
+    public Serializable replyRequest(Serializable msg, Member sender) {
+        String domain = new String(sender.getDomain());
+        if (log.isDebugEnabled()) {
+            log.debug("Membership request received by RpcMembershipRequestHandler for domain " +
+                      domain);
+        }
 
-    public Serializable replyRequest(Serializable msg, Member invoker) {
-        if (msg instanceof GetStateCommand) {
-            // If a GetStateRequest is received by a node which has not yet initialized
-            // this node cannot send a response to the state requester. So we simply return.
-            if (configurationContext.
-                    getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
-                return null;
-            }
-            try {
-                log.info("Received " + msg + " initialization request message from " +
-                         TribesUtil.getName(invoker));
-                GetStateCommand command = (GetStateCommand) msg;
-                command.execute(configurationContext);
-                GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
-                getStateRespCmd.setCommands(command.getCommands());
-                return getStateRespCmd;
-            } catch (ClusteringFault e) {
-                String errMsg = "Cannot handle initialization request";
-                log.error(errMsg, e);
-                throw new RemoteProcessException(errMsg, e);
-            }
-        } else if (msg instanceof GetConfigurationCommand) {
-            // If a GetConfigurationCommand is received by a node which has not yet initialized
-            // this node cannot send a response to the state requester. So we simply return.
-            if (configurationContext.
-                    getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
-                return null;
-            }
-            try {
-                log.info("Received " + msg + " initialization request message from " +
-                         TribesUtil.getName(invoker));
-                GetConfigurationCommand command = (GetConfigurationCommand) msg;
-                command.execute(configurationContext);
-                GetConfigurationResponseCommand
-                        getConfigRespCmd = new GetConfigurationResponseCommand();
-                getConfigRespCmd.setServiceGroups(command.getServiceGroupNames());
-                return getConfigRespCmd;
-            } catch (ClusteringFault e) {
-                String errMsg = "Cannot handle initialization request";
-                log.error(errMsg, e);
-                throw new RemoteProcessException(errMsg, e);
-            }
-        } else if (msg instanceof JoinGroupCommand) {
-            log.info("Received JOIN message from " + TribesUtil.getName(invoker));
-            membershipManager.memberAdded(invoker);
+        if (msg instanceof JoinGroupCommand) {
+            log.info("Received JOIN message from " + TribesUtil.getName(sender) +
+                     TribesUtil.getName(sender) + " in domain " + domain);
+            membershipManager.memberAdded(sender);
 
             // Return the list of current members to the caller
             MemberListCommand memListCmd = new MemberListCommand();
             memListCmd.setMembers(membershipManager.getMembers());
             return memListCmd;
         } else if (msg instanceof MemberJoinedCommand) {
-            log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(invoker));
+            log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(sender) +
+                     TribesUtil.getName(sender) + " in domain " + domain);
             try {
                 MemberJoinedCommand command = (MemberJoinedCommand) msg;
                 command.setMembershipManager(membershipManager);
-                command.execute(configurationContext);
+                command.execute(null);
             } catch (ClusteringFault e) {
                 String errMsg = "Cannot handle MEMBER_JOINED notification";
                 log.error(errMsg, e);
@@ -121,7 +75,7 @@
             try {                    //TODO: What if we receive more than one member list message?
                 MemberListCommand command = (MemberListCommand) msg;
                 command.setMembershipManager(membershipManager);
-                command.execute(configurationContext);
+                command.execute(null);
 
                 //TODO Send MEMBER_JOINED messages to all nodes
             } catch (ClusteringFault e) {
@@ -137,4 +91,4 @@
         //TODO: Method implementation
 
     }
-}
+}
\ No newline at end of file

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jun 30 04:13:47 2008
@@ -47,7 +47,7 @@
 
     private static final Log log = LogFactory.getLog(MembershipManager.class);
 
-    private RpcChannel rpcChannel;
+    private RpcChannel rpcMembershipChannel;
     private StaticMembershipInterceptor staticMembershipInterceptor;
 
     /**
@@ -64,8 +64,8 @@
     public MembershipManager() {
     }
 
-    public void setRpcChannel(RpcChannel rpcChannel) {
-        this.rpcChannel = rpcChannel;
+    public void setRpcMembershipChannel(RpcChannel rpcMembershipChannel) {
+        this.rpcMembershipChannel = rpcMembershipChannel;
     }
 
     public void setStaticMembershipInterceptor(
@@ -150,7 +150,7 @@
         }
 
         if (shouldAddMember) {
-            if (rpcChannel != null && isLocalMemberInitialized() &&
+            if (rpcMembershipChannel != null && isLocalMemberInitialized() &&
                 wkaMembers.contains(member)) { // if it is a well-known member
 
                 log.info("A WKA member " + TribesUtil.getName(member) +
@@ -162,7 +162,7 @@
                     List<Member> members = new ArrayList<Member>(this.members);
                     members.add(localMember); // Need to set the local member too
                     memListCmd.setMembers(members.toArray(new Member[members.size()]));
-                    rpcChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
+                    rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
                                     Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
                 } catch (Exception e) {
                     String errMsg = "Could not send MEMBER_LIST to well-known member " +

Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java (from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java&r1=671600&r2=672741&rev=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java Mon Jun 30 04:13:47 2008
@@ -25,9 +25,6 @@
 import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
 import org.apache.axis2.clustering.control.GetStateCommand;
 import org.apache.axis2.clustering.control.GetStateResponseCommand;
-import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
-import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
-import org.apache.axis2.clustering.control.wka.MemberListCommand;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.catalina.tribes.Member;
 import org.apache.catalina.tribes.RemoteProcessException;
@@ -40,16 +37,13 @@
 /**
  * Handles RPC Channel requests from members
  */
-public class RpcRequestHandler implements RpcCallback {
+public class RpcInitializationRequestHandler implements RpcCallback {
 
-    private static Log log = LogFactory.getLog(RpcRequestHandler.class);
+    private static Log log = LogFactory.getLog(RpcInitializationRequestHandler.class);
     private ConfigurationContext configurationContext;
-    private MembershipManager membershipManager;
 
-    public RpcRequestHandler(ConfigurationContext configurationContext,
-                             MembershipManager membershipManager) {
+    public RpcInitializationRequestHandler(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
-        this.membershipManager = membershipManager;
     }
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
@@ -57,6 +51,9 @@
     }
 
     public Serializable replyRequest(Serializable msg, Member invoker) {
+        if (log.isDebugEnabled()) {
+            log.debug("Initialization request received by RpcInitializationRequestHandler");
+        }
         if (msg instanceof GetStateCommand) {
             // If a GetStateRequest is received by a node which has not yet initialized
             // this node cannot send a response to the state requester. So we simply return.
@@ -98,38 +95,7 @@
                 log.error(errMsg, e);
                 throw new RemoteProcessException(errMsg, e);
             }
-        } else if (msg instanceof JoinGroupCommand) {
-            log.info("Received JOIN message from " + TribesUtil.getName(invoker));
-            membershipManager.memberAdded(invoker);
-
-            // Return the list of current members to the caller
-            MemberListCommand memListCmd = new MemberListCommand();
-            memListCmd.setMembers(membershipManager.getMembers());
-            return memListCmd;
-        } else if (msg instanceof MemberJoinedCommand) {
-            log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(invoker));
-            try {
-                MemberJoinedCommand command = (MemberJoinedCommand) msg;
-                command.setMembershipManager(membershipManager);
-                command.execute(configurationContext);
-            } catch (ClusteringFault e) {
-                String errMsg = "Cannot handle MEMBER_JOINED notification";
-                log.error(errMsg, e);
-                throw new RemoteProcessException(errMsg, e);
-            }
-        } else if (msg instanceof MemberListCommand) {
-            try {                    //TODO: What if we receive more than one member list message?
-                MemberListCommand command = (MemberListCommand) msg;
-                command.setMembershipManager(membershipManager);
-                command.execute(configurationContext);
-
-                //TODO Send MEMBER_JOINED messages to all nodes
-            } catch (ClusteringFault e) {
-                String errMsg = "Cannot handle MEMBER_LIST message";
-                log.error(errMsg, e);
-                throw new RemoteProcessException(errMsg, e);
-            }
-        }
+        } 
         return null;
     }
 
@@ -137,4 +103,4 @@
         //TODO: Method implementation
 
     }
-}
+}
\ No newline at end of file

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jun 30 04:13:47 2008
@@ -75,12 +75,13 @@
 
     private HashMap<String, Parameter> parameters;
     private ManagedChannel channel;
-    private RpcChannel rpcChannel;
+    private RpcChannel initRpcChannel;
+    private RpcChannel membershipRpcChannel;
     private ConfigurationContext configurationContext;
     private ChannelListener channelListener;
     private ChannelSender channelSender;
     private MembershipManager membershipManager;
-    private RpcRequestHandler rpcRequestHandler;
+    private RpcInitializationRequestHandler rpcInitRequestHandler;
     private MembershipScheme membershipScheme;
 
     /**
@@ -151,12 +152,13 @@
 
         // RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
         // picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
-        rpcRequestHandler = new RpcRequestHandler(configurationContext, membershipManager);
-        rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+        rpcInitRequestHandler = new RpcInitializationRequestHandler(configurationContext);
+        initRpcChannel =
+                new RpcChannel(TribesUtil.getRpcInitChannelId(domain),
+                               channel, rpcInitRequestHandler);
         if (log.isDebugEnabled()) {
             log.debug("Created RPC Channel for domain " + new String(domain));
         }
-        membershipManager.setRpcChannel(rpcChannel);
 
         setMaximumRetries();
         configureMode(domain);
@@ -351,12 +353,12 @@
     /**
      * Handle specific configurations related to different membership management schemes.
      *
-     * @param domain             The clustering loadBalancerDomain to which this member belongs to
+     * @param localDomain        The clustering loadBalancerDomain to which this member belongs to
      * @param membershipManagers MembershipManagers for different domains
      * @throws ClusteringFault If the membership scheme is invalid, or if an error occurs
      *                         while configuring membership scheme
      */
-    private void configureMembershipScheme(byte[] domain,
+    private void configureMembershipScheme(byte[] localDomain,
                                            List<MembershipManager> membershipManagers)
             throws ClusteringFault {
         String scheme = getMembershipScheme();
@@ -364,10 +366,10 @@
         if (scheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
             membershipScheme = new WkaBasedMembershipScheme(channel, mode,
                                                             membershipManagers,
-                                                            rpcChannel, membershipManager,
-                                                            parameters, domain, members);
+                                                            membershipManager,
+                                                            parameters, localDomain, members);
         } else if (scheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
-            membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters, domain);
+            membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters, localDomain);
         } else {
             String msg = "Invalid membership scheme '" + scheme +
                          "'. Supported schemes are multicast & wka";
@@ -410,11 +412,11 @@
                 if (!sentMembersList.contains(memberHost)) {
                     Response[] responses;
                     do {
-                        responses = rpcChannel.send(new Member[]{member},
-                                                    command,
-                                                    RpcChannel.FIRST_REPLY,
-                                                    Channel.SEND_OPTIONS_ASYNCHRONOUS,
-                                                    10000);
+                        responses = initRpcChannel.send(new Member[]{member},
+                                                        command,
+                                                        RpcChannel.FIRST_REPLY,
+                                                        Channel.SEND_OPTIONS_ASYNCHRONOUS,
+                                                        10000);
                         if (responses.length == 0) {
                             try {
                                 Thread.sleep(500);
@@ -493,7 +495,7 @@
         log.debug("Enter: TribesClusterManager::shutdown");
         if (channel != null) {
             try {
-                channel.removeChannelListener(rpcChannel);
+                channel.removeChannelListener(initRpcChannel);
                 channel.removeChannelListener(channelListener);
                 channel.stop(Channel.DEFAULT);
             } catch (ChannelException e) {
@@ -510,8 +512,8 @@
 
     public void setConfigurationContext(ConfigurationContext configurationContext) {
         this.configurationContext = configurationContext;
-        if (rpcRequestHandler != null) {
-            rpcRequestHandler.setConfigurationContext(configurationContext);
+        if (rpcInitRequestHandler != null) {
+            rpcInitRequestHandler.setConfigurationContext(configurationContext);
         }
         if (channelListener != null) {
             channelListener.setConfigurationContext(configurationContext);

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Mon Jun 30 04:13:47 2008
@@ -20,6 +20,17 @@
  */
 public final class TribesConstants {
 
+    /**
+     * The ID of the RPC initialization message channel
+     */
+    public static final String RPC_INIT_CHANNEL = "rpc.init.channel";
+
+    /**
+     * The ID of the RPC membership message channel. This channel is only used when WKA
+     * membership discovery mechanism is used
+     */
+    public static final String RPC_MEMBERSHIP_CHANNEL = "rpc.membership.channel";
+
     // Message sending and receiving options
     public static final int MSG_ORDER_OPTION = 512;
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jun 30 04:13:47 2008
@@ -65,4 +65,12 @@
     public static String getLocalHost(Channel channel) {
         return getName(channel.getLocalMember(true));
     }
+
+    public static byte[] getRpcMembershipChannelId(byte[] domain) {
+       return (new String(domain) + TribesConstants.RPC_MEMBERSHIP_CHANNEL).getBytes();
+    }
+
+    public static byte[] getRpcInitChannelId(byte[] domain) {
+        return (new String(domain) + TribesConstants.RPC_INIT_CHANNEL).getBytes();
+    }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Mon Jun 30 04:13:47 2008
@@ -21,14 +21,13 @@
 import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
 import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
 import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.clustering.control.wka.RpcMembershipRequestHandler;
 import org.apache.axis2.description.Parameter;
 import org.apache.axis2.util.Utils;
 import org.apache.catalina.tribes.Channel;
 import org.apache.catalina.tribes.ChannelException;
 import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.RemoteProcessException;
 import org.apache.catalina.tribes.group.Response;
-import org.apache.catalina.tribes.group.RpcCallback;
 import org.apache.catalina.tribes.group.RpcChannel;
 import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
 import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
@@ -40,7 +39,6 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -63,7 +61,6 @@
      * The Tribes channel
      */
     private ManagedChannel channel;
-    private RpcChannel rpcChannel;
     private MembershipManager primaryMembershipManager;
     private List<MembershipManager> applicationDomainMembershipManagers;
     private StaticMembershipInterceptor staticMembershipInterceptor;
@@ -72,7 +69,7 @@
     /**
      * The loadBalancerDomain to which the members belong to
      */
-    private byte[] domain;
+    private byte[] localDomain;
 
     /**
      * The static(well-known) members
@@ -87,7 +84,6 @@
     public WkaBasedMembershipScheme(ManagedChannel channel,
                                     Mode mode,
                                     List<MembershipManager> applicationDomainMembershipManagers,
-                                    RpcChannel rpcChannel,
                                     MembershipManager primaryMembershipManager,
                                     Map<String, Parameter> parameters,
                                     byte[] domain,
@@ -95,10 +91,9 @@
         this.channel = channel;
         this.mode = mode;
         this.applicationDomainMembershipManagers = applicationDomainMembershipManagers;
-        this.rpcChannel = rpcChannel;
         this.primaryMembershipManager = primaryMembershipManager;
         this.parameters = parameters;
-        this.domain = domain;
+        this.localDomain = domain;
         this.members = members;
     }
 
@@ -171,7 +166,7 @@
         localMember.setPayload(payload);
         receiver.setPort(port);
         localMember.setPort(port);
-        localMember.setDomain(domain);
+        localMember.setDomain(localDomain);
         staticMembershipInterceptor.setLocalMember(localMember);
 
         // ------------ END: Configure and add the local member ---------------------
@@ -192,7 +187,7 @@
             // Do not add the local member to the list of members
             if (!(Arrays.equals(localMember.getHost(), tribesMember.getHost()) &&
                   localMember.getPort() == tribesMember.getPort())) {
-                tribesMember.setDomain(domain);
+                tribesMember.setDomain(localDomain);
 
                 // We will add the member even if it is offline at this moment. When the
                 // member comes online, it will be detected by the GMS
@@ -324,7 +319,7 @@
             log.debug("Added Static Membership Interceptor");
         }
 
-        channel.getMembershipService().setDomain(domain);
+        channel.getMembershipService().setDomain(localDomain);
         mode.addInterceptors(channel);
 
         // Add a AtMostOnceInterceptor to support at-most-once message processing semantics
@@ -351,20 +346,29 @@
      */
     public void joinGroup() throws ClusteringFault {
 
-        // Have multiple RPC channels with multiple RPC request handlers for each domain
+        // Have multiple RPC channels with multiple RPC request handlers for each localDomain
         // This is needed only when this member is running as a load balancer
         for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
+            appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
 
-            // Create an RpcChannel for each domain
+            // Create an RpcChannel for each localDomain
             String domain = new String(appDomainMembershipManager.getDomain());
-            new RpcChannel(domain.getBytes(),
-                           channel,
-                           new RpcRequestHandler(appDomainMembershipManager));
-            if(log.isDebugEnabled()){
+            RpcChannel rpcMembershipChannel =
+                    new RpcChannel(TribesUtil.getRpcMembershipChannelId(appDomainMembershipManager.getDomain()), 
+                                   channel,
+                                   new RpcMembershipRequestHandler(appDomainMembershipManager));
+            appDomainMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
+            if (log.isDebugEnabled()) {
                 log.debug("Created RPC Channel for application domain " + domain);
             }
         }
 
+        // Create a Membership channel for handling membership requests
+        RpcChannel rpcMembershipChannel =
+                new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
+                               channel, new RpcMembershipRequestHandler(primaryMembershipManager));
+        primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
+
         // Send JOIN message to a WKA member
         if (primaryMembershipManager.getMembers().length > 0) {
             log.info("Sending JOIN message to WKA members...");
@@ -376,14 +380,17 @@
             Response[] responses = null;
             do {
                 try {
-                    responses = rpcChannel.send(wkaMembers,
-                                                new JoinGroupCommand(),
-                                                RpcChannel.ALL_REPLY,
-                                                Channel.SEND_OPTIONS_ASYNCHRONOUS |
-                                                TribesConstants.MEMBERSHIP_MSG_OPTION,
-                                                10000);
+                    responses = rpcMembershipChannel.send(wkaMembers,
+                                                          new JoinGroupCommand(),
+                                                          RpcChannel.ALL_REPLY,
+                                                          Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                                          TribesConstants.MEMBERSHIP_MSG_OPTION,
+                                                          10000);
                     if (responses.length == 0) {
                         try {
+                            if(log.isDebugEnabled()){
+                                log.debug("No responses received");
+                            }
                             Thread.sleep(500);
                         } catch (InterruptedException ignored) {
                         }
@@ -402,7 +409,7 @@
             }
             while (responses == null || responses.length == 0);  // Wait until we've received at least one response
 
-            //TODO: ######## If this node is a LB, it needs to get the entire domain to member-list map
+            //TODO: ######## If this node is a LB, it needs to get the entire localDomain to member-list map
 
             for (Response response : responses) {
                 MemberListCommand command = (MemberListCommand) response.getMessage();
@@ -413,26 +420,26 @@
                 if (!Arrays.equals(response.getSource().getDomain(),
                                    primaryMembershipManager.getLocalMember().getDomain())) {
                     primaryMembershipManager.memberDisappeared(response.getSource());
-                    if(log.isDebugEnabled()){
-                        log.debug("Removed member " + TribesUtil.getName(response.getSource()) + 
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removed member " + TribesUtil.getName(response.getSource()) +
                                   " since it does not belong to the local domain " +
                                   new String(primaryMembershipManager.getLocalMember().getDomain()));
                     }
                 }
             }
 
-            // Send MEMBER_JOINE to the group
+            // Send MEMBER_JOINED to the group
             if (primaryMembershipManager.getMembers().length > 0) {
                 log.info("Sending MEMBER_JOINED to group...");
                 MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
                 memberJoinedCommand.setMember(primaryMembershipManager.getLocalMember());
                 try {
-                    rpcChannel.send(primaryMembershipManager.getMembers(),
-                                    memberJoinedCommand,
-                                    RpcChannel.ALL_REPLY,
-                                    Channel.SEND_OPTIONS_ASYNCHRONOUS |
-                                    TribesConstants.MEMBERSHIP_MSG_OPTION,
-                                    10000);
+                    rpcMembershipChannel.send(primaryMembershipManager.getMembers(),
+                                              memberJoinedCommand,
+                                              RpcChannel.ALL_REPLY,
+                                              Channel.SEND_OPTIONS_ASYNCHRONOUS |
+                                              TribesConstants.MEMBERSHIP_MSG_OPTION,
+                                              10000);
                 } catch (ChannelException e) {
                     String msg = "Could not send MEMBER_JOINED message to group";
                     log.error(msg, e);
@@ -445,59 +452,4 @@
     public Parameter getParameter(String name) {
         return parameters.get(name);
     }
-
-    private class RpcRequestHandler implements RpcCallback {
-
-        private MembershipManager membershipManager;  //TODO: ############# Will need to inform about membership when a WKA member who is a LB joins
-
-        private RpcRequestHandler(MembershipManager membershipManager) {
-            this.membershipManager = membershipManager;
-            membershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
-        }
-
-        public Serializable replyRequest(Serializable msg, org.apache.catalina.tribes.Member sender) {
-            String domain = new String(sender.getDomain());
-            if(log.isDebugEnabled()){
-                log.debug("Request received by RpcRequestHandler for domain " + domain);
-            }
-            if (msg instanceof JoinGroupCommand) {
-                log.info("Received JOIN message from application member " +
-                         TribesUtil.getName(sender) + " in domain " + domain);
-                // Return the list of current members to the caller
-                MemberListCommand memListCmd = new MemberListCommand();
-                memListCmd.setMembers(membershipManager.getMembers());
-
-                membershipManager.memberAdded(sender);
-                return memListCmd;
-            } else if (msg instanceof MemberJoinedCommand) {
-                log.info("Received MEMBER_JOINED message from application member " +
-                         TribesUtil.getName(sender) + " in domain " + domain);
-                try {
-                    MemberJoinedCommand command = (MemberJoinedCommand) msg;
-                    command.setMembershipManager(membershipManager);
-                    command.execute(null);
-                } catch (ClusteringFault e) {
-                    String errMsg = "Cannot handle MEMBER_JOINED notification";
-                    log.error(errMsg, e);
-                    throw new RemoteProcessException(errMsg, e);
-                }
-            } else if (msg instanceof MemberListCommand) {
-                try {                    //TODO: What if we receive more than one member list message?
-                    MemberListCommand command = (MemberListCommand) msg;
-                    command.setMembershipManager(membershipManager);
-                    command.execute(null);
-
-                    //TODO Send MEMBER_JOINED messages to all nodes
-                } catch (ClusteringFault e) {
-                    String errMsg = "Cannot handle MEMBER_LIST message";
-                    log.error(errMsg, e);
-                    throw new RemoteProcessException(errMsg, e);
-                }
-            }
-            return null;
-        }
-
-        public void leftOver(Serializable msg, org.apache.catalina.tribes.Member sender) {
-        }
-    }
 }