You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/30 13:13:47 UTC
svn commit: r672741 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
control/ control/wka/ tribes/
Author: azeez
Date: Mon Jun 30 04:13:47 2008
New Revision: 672741
URL: http://svn.apache.org/viewvc?rev=672741&view=rev
Log:
Introducing separate RPC channel for membership messages (when WKA membershi is used) & init messages
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java
- copied, changed from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java
- copied, changed from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
Removed:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateCommand.java Mon Jun 30 04:13:47 2008
@@ -48,12 +48,12 @@
ContextManager contextManager = clusterManager.getContextManager();
if (contextManager != null) {
Map excludedPropPatterns = contextManager.getReplicationExcludePatterns();
- List cmdList = new ArrayList();
+ List<ContextClusteringCommand> cmdList = new ArrayList<ContextClusteringCommand>();
// Add the service group contexts, service contexts & their respective properties
String[] sgCtxIDs = configCtx.getServiceGroupContextIDs();
- for (int i = 0; i < sgCtxIDs.length; i ++) {
- ServiceGroupContext sgCtx = configCtx.getServiceGroupContext(sgCtxIDs[i]);
+ for (String sgCtxID : sgCtxIDs) {
+ ServiceGroupContext sgCtx = configCtx.getServiceGroupContext(sgCtxID);
ContextClusteringCommand updateServiceGroupCtxCmd =
ContextClusteringCommandFactory.getUpdateCommand(sgCtx,
excludedPropPatterns,
@@ -83,8 +83,7 @@
cmdList.add(updateCmd);
}
if (!cmdList.isEmpty()) {
- commands = (ContextClusteringCommand[]) cmdList.
- toArray(new ContextClusteringCommand[cmdList.size()]);
+ commands = cmdList.toArray(new ContextClusteringCommand[cmdList.size()]);
}
}
}
Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java (from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java&r1=671600&r2=672741&rev=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/RpcMembershipRequestHandler.java Mon Jun 30 04:13:47 2008
@@ -17,18 +17,11 @@
* under the License.
*/
-package org.apache.axis2.clustering.tribes;
+package org.apache.axis2.clustering.control.wka;
-import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
-import org.apache.axis2.clustering.control.GetConfigurationCommand;
-import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
-import org.apache.axis2.clustering.control.GetStateCommand;
-import org.apache.axis2.clustering.control.GetStateResponseCommand;
-import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
-import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
-import org.apache.axis2.clustering.control.wka.MemberListCommand;
-import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.clustering.tribes.MembershipManager;
+import org.apache.axis2.clustering.tribes.TribesUtil;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcCallback;
@@ -38,80 +31,41 @@
import java.io.Serializable;
/**
- * Handles RPC Channel requests from members
+ * Handles RPC membership requests from members. This is used only in conjunction with WKA based
+ * membership mamangement
*/
-public class RpcRequestHandler implements RpcCallback {
+public class RpcMembershipRequestHandler implements RpcCallback {
- private static Log log = LogFactory.getLog(RpcRequestHandler.class);
- private ConfigurationContext configurationContext;
+ private static Log log = LogFactory.getLog(RpcMembershipRequestHandler.class);
private MembershipManager membershipManager;
- public RpcRequestHandler(ConfigurationContext configurationContext,
- MembershipManager membershipManager) {
- this.configurationContext = configurationContext;
+ public RpcMembershipRequestHandler(MembershipManager membershipManager) {
this.membershipManager = membershipManager;
}
- public void setConfigurationContext(ConfigurationContext configurationContext) {
- this.configurationContext = configurationContext;
- }
+ public Serializable replyRequest(Serializable msg, Member sender) {
+ String domain = new String(sender.getDomain());
+ if (log.isDebugEnabled()) {
+ log.debug("Membership request received by RpcMembershipRequestHandler for domain " +
+ domain);
+ }
- public Serializable replyRequest(Serializable msg, Member invoker) {
- if (msg instanceof GetStateCommand) {
- // If a GetStateRequest is received by a node which has not yet initialized
- // this node cannot send a response to the state requester. So we simply return.
- if (configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
- return null;
- }
- try {
- log.info("Received " + msg + " initialization request message from " +
- TribesUtil.getName(invoker));
- GetStateCommand command = (GetStateCommand) msg;
- command.execute(configurationContext);
- GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
- getStateRespCmd.setCommands(command.getCommands());
- return getStateRespCmd;
- } catch (ClusteringFault e) {
- String errMsg = "Cannot handle initialization request";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
- } else if (msg instanceof GetConfigurationCommand) {
- // If a GetConfigurationCommand is received by a node which has not yet initialized
- // this node cannot send a response to the state requester. So we simply return.
- if (configurationContext.
- getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
- return null;
- }
- try {
- log.info("Received " + msg + " initialization request message from " +
- TribesUtil.getName(invoker));
- GetConfigurationCommand command = (GetConfigurationCommand) msg;
- command.execute(configurationContext);
- GetConfigurationResponseCommand
- getConfigRespCmd = new GetConfigurationResponseCommand();
- getConfigRespCmd.setServiceGroups(command.getServiceGroupNames());
- return getConfigRespCmd;
- } catch (ClusteringFault e) {
- String errMsg = "Cannot handle initialization request";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
- } else if (msg instanceof JoinGroupCommand) {
- log.info("Received JOIN message from " + TribesUtil.getName(invoker));
- membershipManager.memberAdded(invoker);
+ if (msg instanceof JoinGroupCommand) {
+ log.info("Received JOIN message from " + TribesUtil.getName(sender) +
+ TribesUtil.getName(sender) + " in domain " + domain);
+ membershipManager.memberAdded(sender);
// Return the list of current members to the caller
MemberListCommand memListCmd = new MemberListCommand();
memListCmd.setMembers(membershipManager.getMembers());
return memListCmd;
} else if (msg instanceof MemberJoinedCommand) {
- log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(invoker));
+ log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(sender) +
+ TribesUtil.getName(sender) + " in domain " + domain);
try {
MemberJoinedCommand command = (MemberJoinedCommand) msg;
command.setMembershipManager(membershipManager);
- command.execute(configurationContext);
+ command.execute(null);
} catch (ClusteringFault e) {
String errMsg = "Cannot handle MEMBER_JOINED notification";
log.error(errMsg, e);
@@ -121,7 +75,7 @@
try { //TODO: What if we receive more than one member list message?
MemberListCommand command = (MemberListCommand) msg;
command.setMembershipManager(membershipManager);
- command.execute(configurationContext);
+ command.execute(null);
//TODO Send MEMBER_JOINED messages to all nodes
} catch (ClusteringFault e) {
@@ -137,4 +91,4 @@
//TODO: Method implementation
}
-}
+}
\ No newline at end of file
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jun 30 04:13:47 2008
@@ -47,7 +47,7 @@
private static final Log log = LogFactory.getLog(MembershipManager.class);
- private RpcChannel rpcChannel;
+ private RpcChannel rpcMembershipChannel;
private StaticMembershipInterceptor staticMembershipInterceptor;
/**
@@ -64,8 +64,8 @@
public MembershipManager() {
}
- public void setRpcChannel(RpcChannel rpcChannel) {
- this.rpcChannel = rpcChannel;
+ public void setRpcMembershipChannel(RpcChannel rpcMembershipChannel) {
+ this.rpcMembershipChannel = rpcMembershipChannel;
}
public void setStaticMembershipInterceptor(
@@ -150,7 +150,7 @@
}
if (shouldAddMember) {
- if (rpcChannel != null && isLocalMemberInitialized() &&
+ if (rpcMembershipChannel != null && isLocalMemberInitialized() &&
wkaMembers.contains(member)) { // if it is a well-known member
log.info("A WKA member " + TribesUtil.getName(member) +
@@ -162,7 +162,7 @@
List<Member> members = new ArrayList<Member>(this.members);
members.add(localMember); // Need to set the local member too
memListCmd.setMembers(members.toArray(new Member[members.size()]));
- rpcChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
+ rpcMembershipChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
} catch (Exception e) {
String errMsg = "Could not send MEMBER_LIST to well-known member " +
Copied: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java (from r671600, webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java)
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java?p2=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java&p1=webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java&r1=671600&r2=672741&rev=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcInitializationRequestHandler.java Mon Jun 30 04:13:47 2008
@@ -25,9 +25,6 @@
import org.apache.axis2.clustering.control.GetConfigurationResponseCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.clustering.control.GetStateResponseCommand;
-import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
-import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
-import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
@@ -40,16 +37,13 @@
/**
* Handles RPC Channel requests from members
*/
-public class RpcRequestHandler implements RpcCallback {
+public class RpcInitializationRequestHandler implements RpcCallback {
- private static Log log = LogFactory.getLog(RpcRequestHandler.class);
+ private static Log log = LogFactory.getLog(RpcInitializationRequestHandler.class);
private ConfigurationContext configurationContext;
- private MembershipManager membershipManager;
- public RpcRequestHandler(ConfigurationContext configurationContext,
- MembershipManager membershipManager) {
+ public RpcInitializationRequestHandler(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
- this.membershipManager = membershipManager;
}
public void setConfigurationContext(ConfigurationContext configurationContext) {
@@ -57,6 +51,9 @@
}
public Serializable replyRequest(Serializable msg, Member invoker) {
+ if (log.isDebugEnabled()) {
+ log.debug("Initialization request received by RpcInitializationRequestHandler");
+ }
if (msg instanceof GetStateCommand) {
// If a GetStateRequest is received by a node which has not yet initialized
// this node cannot send a response to the state requester. So we simply return.
@@ -98,38 +95,7 @@
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
- } else if (msg instanceof JoinGroupCommand) {
- log.info("Received JOIN message from " + TribesUtil.getName(invoker));
- membershipManager.memberAdded(invoker);
-
- // Return the list of current members to the caller
- MemberListCommand memListCmd = new MemberListCommand();
- memListCmd.setMembers(membershipManager.getMembers());
- return memListCmd;
- } else if (msg instanceof MemberJoinedCommand) {
- log.info("Received MEMBER_JOINED message from " + TribesUtil.getName(invoker));
- try {
- MemberJoinedCommand command = (MemberJoinedCommand) msg;
- command.setMembershipManager(membershipManager);
- command.execute(configurationContext);
- } catch (ClusteringFault e) {
- String errMsg = "Cannot handle MEMBER_JOINED notification";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
- } else if (msg instanceof MemberListCommand) {
- try { //TODO: What if we receive more than one member list message?
- MemberListCommand command = (MemberListCommand) msg;
- command.setMembershipManager(membershipManager);
- command.execute(configurationContext);
-
- //TODO Send MEMBER_JOINED messages to all nodes
- } catch (ClusteringFault e) {
- String errMsg = "Cannot handle MEMBER_LIST message";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
- }
+ }
return null;
}
@@ -137,4 +103,4 @@
//TODO: Method implementation
}
-}
+}
\ No newline at end of file
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jun 30 04:13:47 2008
@@ -75,12 +75,13 @@
private HashMap<String, Parameter> parameters;
private ManagedChannel channel;
- private RpcChannel rpcChannel;
+ private RpcChannel initRpcChannel;
+ private RpcChannel membershipRpcChannel;
private ConfigurationContext configurationContext;
private ChannelListener channelListener;
private ChannelSender channelSender;
private MembershipManager membershipManager;
- private RpcRequestHandler rpcRequestHandler;
+ private RpcInitializationRequestHandler rpcInitRequestHandler;
private MembershipScheme membershipScheme;
/**
@@ -151,12 +152,13 @@
// RpcChannel is a ChannelListener. When the reply to a particular request comes back, it
// picks it up. Each RPC is given a UUID, hence can correlate the request-response pair
- rpcRequestHandler = new RpcRequestHandler(configurationContext, membershipManager);
- rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+ rpcInitRequestHandler = new RpcInitializationRequestHandler(configurationContext);
+ initRpcChannel =
+ new RpcChannel(TribesUtil.getRpcInitChannelId(domain),
+ channel, rpcInitRequestHandler);
if (log.isDebugEnabled()) {
log.debug("Created RPC Channel for domain " + new String(domain));
}
- membershipManager.setRpcChannel(rpcChannel);
setMaximumRetries();
configureMode(domain);
@@ -351,12 +353,12 @@
/**
* Handle specific configurations related to different membership management schemes.
*
- * @param domain The clustering loadBalancerDomain to which this member belongs to
+ * @param localDomain The clustering loadBalancerDomain to which this member belongs to
* @param membershipManagers MembershipManagers for different domains
* @throws ClusteringFault If the membership scheme is invalid, or if an error occurs
* while configuring membership scheme
*/
- private void configureMembershipScheme(byte[] domain,
+ private void configureMembershipScheme(byte[] localDomain,
List<MembershipManager> membershipManagers)
throws ClusteringFault {
String scheme = getMembershipScheme();
@@ -364,10 +366,10 @@
if (scheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
membershipScheme = new WkaBasedMembershipScheme(channel, mode,
membershipManagers,
- rpcChannel, membershipManager,
- parameters, domain, members);
+ membershipManager,
+ parameters, localDomain, members);
} else if (scheme.equals(ClusteringConstants.MembershipScheme.MULTICAST_BASED)) {
- membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters, domain);
+ membershipScheme = new MulticastBasedMembershipScheme(channel, mode, parameters, localDomain);
} else {
String msg = "Invalid membership scheme '" + scheme +
"'. Supported schemes are multicast & wka";
@@ -410,11 +412,11 @@
if (!sentMembersList.contains(memberHost)) {
Response[] responses;
do {
- responses = rpcChannel.send(new Member[]{member},
- command,
- RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
+ responses = initRpcChannel.send(new Member[]{member},
+ command,
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
if (responses.length == 0) {
try {
Thread.sleep(500);
@@ -493,7 +495,7 @@
log.debug("Enter: TribesClusterManager::shutdown");
if (channel != null) {
try {
- channel.removeChannelListener(rpcChannel);
+ channel.removeChannelListener(initRpcChannel);
channel.removeChannelListener(channelListener);
channel.stop(Channel.DEFAULT);
} catch (ChannelException e) {
@@ -510,8 +512,8 @@
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
- if (rpcRequestHandler != null) {
- rpcRequestHandler.setConfigurationContext(configurationContext);
+ if (rpcInitRequestHandler != null) {
+ rpcInitRequestHandler.setConfigurationContext(configurationContext);
}
if (channelListener != null) {
channelListener.setConfigurationContext(configurationContext);
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesConstants.java Mon Jun 30 04:13:47 2008
@@ -20,6 +20,17 @@
*/
public final class TribesConstants {
+ /**
+ * The ID of the RPC initialization message channel
+ */
+ public static final String RPC_INIT_CHANNEL = "rpc.init.channel";
+
+ /**
+ * The ID of the RPC membership message channel. This channel is only used when WKA
+ * membership discovery mechanism is used
+ */
+ public static final String RPC_MEMBERSHIP_CHANNEL = "rpc.membership.channel";
+
// Message sending and receiving options
public static final int MSG_ORDER_OPTION = 512;
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jun 30 04:13:47 2008
@@ -65,4 +65,12 @@
public static String getLocalHost(Channel channel) {
return getName(channel.getLocalMember(true));
}
+
+ public static byte[] getRpcMembershipChannelId(byte[] domain) {
+ return (new String(domain) + TribesConstants.RPC_MEMBERSHIP_CHANNEL).getBytes();
+ }
+
+ public static byte[] getRpcInitChannelId(byte[] domain) {
+ return (new String(domain) + TribesConstants.RPC_INIT_CHANNEL).getBytes();
+ }
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java?rev=672741&r1=672740&r2=672741&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/WkaBasedMembershipScheme.java Mon Jun 30 04:13:47 2008
@@ -21,14 +21,13 @@
import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.axis2.clustering.control.wka.RpcMembershipRequestHandler;
import org.apache.axis2.description.Parameter;
import org.apache.axis2.util.Utils;
import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.ChannelException;
import org.apache.catalina.tribes.ManagedChannel;
-import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.Response;
-import org.apache.catalina.tribes.group.RpcCallback;
import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.OrderInterceptor;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
@@ -40,7 +39,6 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
-import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
@@ -63,7 +61,6 @@
* The Tribes channel
*/
private ManagedChannel channel;
- private RpcChannel rpcChannel;
private MembershipManager primaryMembershipManager;
private List<MembershipManager> applicationDomainMembershipManagers;
private StaticMembershipInterceptor staticMembershipInterceptor;
@@ -72,7 +69,7 @@
/**
* The loadBalancerDomain to which the members belong to
*/
- private byte[] domain;
+ private byte[] localDomain;
/**
* The static(well-known) members
@@ -87,7 +84,6 @@
public WkaBasedMembershipScheme(ManagedChannel channel,
Mode mode,
List<MembershipManager> applicationDomainMembershipManagers,
- RpcChannel rpcChannel,
MembershipManager primaryMembershipManager,
Map<String, Parameter> parameters,
byte[] domain,
@@ -95,10 +91,9 @@
this.channel = channel;
this.mode = mode;
this.applicationDomainMembershipManagers = applicationDomainMembershipManagers;
- this.rpcChannel = rpcChannel;
this.primaryMembershipManager = primaryMembershipManager;
this.parameters = parameters;
- this.domain = domain;
+ this.localDomain = domain;
this.members = members;
}
@@ -171,7 +166,7 @@
localMember.setPayload(payload);
receiver.setPort(port);
localMember.setPort(port);
- localMember.setDomain(domain);
+ localMember.setDomain(localDomain);
staticMembershipInterceptor.setLocalMember(localMember);
// ------------ END: Configure and add the local member ---------------------
@@ -192,7 +187,7 @@
// Do not add the local member to the list of members
if (!(Arrays.equals(localMember.getHost(), tribesMember.getHost()) &&
localMember.getPort() == tribesMember.getPort())) {
- tribesMember.setDomain(domain);
+ tribesMember.setDomain(localDomain);
// We will add the member even if it is offline at this moment. When the
// member comes online, it will be detected by the GMS
@@ -324,7 +319,7 @@
log.debug("Added Static Membership Interceptor");
}
- channel.getMembershipService().setDomain(domain);
+ channel.getMembershipService().setDomain(localDomain);
mode.addInterceptors(channel);
// Add a AtMostOnceInterceptor to support at-most-once message processing semantics
@@ -351,20 +346,29 @@
*/
public void joinGroup() throws ClusteringFault {
- // Have multiple RPC channels with multiple RPC request handlers for each domain
+ // Have multiple RPC channels with multiple RPC request handlers for each localDomain
// This is needed only when this member is running as a load balancer
for (MembershipManager appDomainMembershipManager : applicationDomainMembershipManagers) {
+ appDomainMembershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
- // Create an RpcChannel for each domain
+ // Create an RpcChannel for each localDomain
String domain = new String(appDomainMembershipManager.getDomain());
- new RpcChannel(domain.getBytes(),
- channel,
- new RpcRequestHandler(appDomainMembershipManager));
- if(log.isDebugEnabled()){
+ RpcChannel rpcMembershipChannel =
+ new RpcChannel(TribesUtil.getRpcMembershipChannelId(appDomainMembershipManager.getDomain()),
+ channel,
+ new RpcMembershipRequestHandler(appDomainMembershipManager));
+ appDomainMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
+ if (log.isDebugEnabled()) {
log.debug("Created RPC Channel for application domain " + domain);
}
}
+ // Create a Membership channel for handling membership requests
+ RpcChannel rpcMembershipChannel =
+ new RpcChannel(TribesUtil.getRpcMembershipChannelId(localDomain),
+ channel, new RpcMembershipRequestHandler(primaryMembershipManager));
+ primaryMembershipManager.setRpcMembershipChannel(rpcMembershipChannel);
+
// Send JOIN message to a WKA member
if (primaryMembershipManager.getMembers().length > 0) {
log.info("Sending JOIN message to WKA members...");
@@ -376,14 +380,17 @@
Response[] responses = null;
do {
try {
- responses = rpcChannel.send(wkaMembers,
- new JoinGroupCommand(),
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS |
- TribesConstants.MEMBERSHIP_MSG_OPTION,
- 10000);
+ responses = rpcMembershipChannel.send(wkaMembers,
+ new JoinGroupCommand(),
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION,
+ 10000);
if (responses.length == 0) {
try {
+ if(log.isDebugEnabled()){
+ log.debug("No responses received");
+ }
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
@@ -402,7 +409,7 @@
}
while (responses == null || responses.length == 0); // Wait until we've received at least one response
- //TODO: ######## If this node is a LB, it needs to get the entire domain to member-list map
+ //TODO: ######## If this node is a LB, it needs to get the entire localDomain to member-list map
for (Response response : responses) {
MemberListCommand command = (MemberListCommand) response.getMessage();
@@ -413,26 +420,26 @@
if (!Arrays.equals(response.getSource().getDomain(),
primaryMembershipManager.getLocalMember().getDomain())) {
primaryMembershipManager.memberDisappeared(response.getSource());
- if(log.isDebugEnabled()){
- log.debug("Removed member " + TribesUtil.getName(response.getSource()) +
+ if (log.isDebugEnabled()) {
+ log.debug("Removed member " + TribesUtil.getName(response.getSource()) +
" since it does not belong to the local domain " +
new String(primaryMembershipManager.getLocalMember().getDomain()));
}
}
}
- // Send MEMBER_JOINE to the group
+ // Send MEMBER_JOINED to the group
if (primaryMembershipManager.getMembers().length > 0) {
log.info("Sending MEMBER_JOINED to group...");
MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
memberJoinedCommand.setMember(primaryMembershipManager.getLocalMember());
try {
- rpcChannel.send(primaryMembershipManager.getMembers(),
- memberJoinedCommand,
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS |
- TribesConstants.MEMBERSHIP_MSG_OPTION,
- 10000);
+ rpcMembershipChannel.send(primaryMembershipManager.getMembers(),
+ memberJoinedCommand,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS |
+ TribesConstants.MEMBERSHIP_MSG_OPTION,
+ 10000);
} catch (ChannelException e) {
String msg = "Could not send MEMBER_JOINED message to group";
log.error(msg, e);
@@ -445,59 +452,4 @@
public Parameter getParameter(String name) {
return parameters.get(name);
}
-
- private class RpcRequestHandler implements RpcCallback {
-
- private MembershipManager membershipManager; //TODO: ############# Will need to inform about membership when a WKA member who is a LB joins
-
- private RpcRequestHandler(MembershipManager membershipManager) {
- this.membershipManager = membershipManager;
- membershipManager.setStaticMembershipInterceptor(staticMembershipInterceptor);
- }
-
- public Serializable replyRequest(Serializable msg, org.apache.catalina.tribes.Member sender) {
- String domain = new String(sender.getDomain());
- if(log.isDebugEnabled()){
- log.debug("Request received by RpcRequestHandler for domain " + domain);
- }
- if (msg instanceof JoinGroupCommand) {
- log.info("Received JOIN message from application member " +
- TribesUtil.getName(sender) + " in domain " + domain);
- // Return the list of current members to the caller
- MemberListCommand memListCmd = new MemberListCommand();
- memListCmd.setMembers(membershipManager.getMembers());
-
- membershipManager.memberAdded(sender);
- return memListCmd;
- } else if (msg instanceof MemberJoinedCommand) {
- log.info("Received MEMBER_JOINED message from application member " +
- TribesUtil.getName(sender) + " in domain " + domain);
- try {
- MemberJoinedCommand command = (MemberJoinedCommand) msg;
- command.setMembershipManager(membershipManager);
- command.execute(null);
- } catch (ClusteringFault e) {
- String errMsg = "Cannot handle MEMBER_JOINED notification";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
- } else if (msg instanceof MemberListCommand) {
- try { //TODO: What if we receive more than one member list message?
- MemberListCommand command = (MemberListCommand) msg;
- command.setMembershipManager(membershipManager);
- command.execute(null);
-
- //TODO Send MEMBER_JOINED messages to all nodes
- } catch (ClusteringFault e) {
- String errMsg = "Cannot handle MEMBER_LIST message";
- log.error(errMsg, e);
- throw new RemoteProcessException(errMsg, e);
- }
- }
- return null;
- }
-
- public void leftOver(Serializable msg, org.apache.catalina.tribes.Member sender) {
- }
- }
}