You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/05/23 08:23:45 UTC
svn commit: r659446 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
control/wka/ tribes/
Author: azeez
Date: Thu May 22 23:23:42 2008
New Revision: 659446
URL: http://svn.apache.org/viewvc?rev=659446&view=rev
Log:
Improvemements to WKA memebership scheme
1. When a new member joins, a MEMBER_LIST message is sent to it
2. A new member will advertise its newly received group membership to all group memeber
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/JoinGroupCommand.java Thu May 22 23:23:42 2008
@@ -15,14 +15,15 @@
*/
package org.apache.axis2.clustering.control.wka;
-import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.ClusteringFault;
+import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
- *
+ * This is the message a member will send to another member when it intends to join a group.
+ * This is used with WKA based membership
*/
public class JoinGroupCommand extends ControlCommand {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/LeaveGroupCommand.java Thu May 22 23:23:42 2008
@@ -20,7 +20,7 @@
import org.apache.axis2.context.ConfigurationContext;
/**
- *
+ * When a member wishes to gracefully leave a group, it will send this message to the group
*/
public class LeaveGroupCommand extends ControlCommand {
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberJoinedCommand.java Thu May 22 23:23:42 2008
@@ -21,17 +21,16 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import java.util.Arrays;
/**
- *
+ * This is the notification message a member will send to all others in the group after it has
+ * joined the group. When the other members received this message, they will add the newly joined
+ * member to their member list
*/
public class MemberJoinedCommand extends ControlCommand {
- private static final Log log = LogFactory.getLog(MemberJoinedCommand.class);
private Member member;
private MembershipManager membershipManager;
private StaticMembershipInterceptor staticMembershipInterceptor;
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/wka/MemberListCommand.java Thu May 22 23:23:42 2008
@@ -18,17 +18,24 @@
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.tribes.MembershipManager;
+import org.apache.axis2.clustering.tribes.TribesUtil;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.Arrays;
/**
- *
+ * When a new member wishes to join a group, it will send a {@link JoinGroupCommand} message to
+ * a known member. Then this known member will respond with this MemberListCommand message.
+ * This message will contain a list of all current members.
*/
public class MemberListCommand extends ControlCommand {
+ private static final Log log = LogFactory.getLog(MemberListCommand.class);
+
private Member[] members;
private MembershipManager membershipManager;
private StaticMembershipInterceptor staticMembershipInterceptor;
@@ -37,8 +44,7 @@
this.membershipManager = membershipManager;
}
- public void setStaticMembershipInterceptor(
- StaticMembershipInterceptor staticMembershipInterceptor) {
+ public void setStaticMembershipInterceptor(StaticMembershipInterceptor staticMembershipInterceptor) {
this.staticMembershipInterceptor = staticMembershipInterceptor;
}
@@ -47,10 +53,12 @@
}
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
+ log.info("Received MEMBER_LIST message");
for (Member member : members) {
Member localMember = membershipManager.getLocalMember();
if (!(Arrays.equals(localMember.getHost(), member.getHost()) &&
localMember.getPort() == member.getPort())) {
+ log.info("Added member " + TribesUtil.getHost(member));
membershipManager.memberAdded(member);
staticMembershipInterceptor.memberAdded(member);
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Thu May 22 23:23:42 2008
@@ -29,11 +29,9 @@
import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.group.RpcCallback;
-import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.group.interceptors.StaticMembershipInterceptor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,7 +47,6 @@
private ConfigurationContext configurationContext;
private MembershipManager membershipManager;
private StaticMembershipInterceptor staticMembershipInterceptor;
- private RpcChannel rpcChannel;
public RpcRequestHandler(ConfigurationContext configurationContext,
MembershipManager membershipManager,
@@ -59,15 +56,11 @@
this.staticMembershipInterceptor = staticMembershipInterceptor;
}
- public void setRpcChannel(RpcChannel rpcChannel) {
- this.rpcChannel = rpcChannel;
- }
-
public void setConfigurationContext(ConfigurationContext configurationContext) {
this.configurationContext = configurationContext;
}
- public Serializable replyRequest(Serializable msg, Member member) {
+ public Serializable replyRequest(Serializable msg, Member invoker) {
if (msg instanceof GetStateCommand) {
// If a GetStateRequest is received by a node which has not yet initialized
// this node cannot send a response to the state requester. So we simply return.
@@ -77,7 +70,7 @@
}
try {
log.info("Received " + msg + " initialization request message from " +
- TribesUtil.getHost(member));
+ TribesUtil.getHost(invoker));
GetStateCommand command = (GetStateCommand) msg;
command.execute(configurationContext);
GetStateResponseCommand getStateRespCmd = new GetStateResponseCommand();
@@ -97,7 +90,7 @@
}
try {
log.info("Received " + msg + " initialization request message from " +
- TribesUtil.getHost(member));
+ TribesUtil.getHost(invoker));
GetConfigurationCommand command = (GetConfigurationCommand) msg;
command.execute(configurationContext);
GetConfigurationResponseCommand
@@ -110,25 +103,16 @@
throw new RemoteProcessException(errMsg, e);
}
} else if (msg instanceof JoinGroupCommand) {
- log.info("Received JOIN message from " + TribesUtil.getHost(member));
+ log.info("Received JOIN message from " + TribesUtil.getHost(invoker));
MemberListCommand memListCmd;
try {
// Add the member
- staticMembershipInterceptor.memberAdded(member);
- membershipManager.memberAdded(member);
+ staticMembershipInterceptor.memberAdded(invoker);
+ membershipManager.memberAdded(invoker);
// Return the list of current members to the caller
memListCmd = new MemberListCommand();
memListCmd.setMembers(membershipManager.getMembers());
-
- // Send a message to all other informing that a member has joined
- MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
- memberJoinedCommand.setMember(member);
- rpcChannel.send(membershipManager.getMembers(),
- memberJoinedCommand,
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
} catch (Exception e) {
String errMsg = "Cannot handle JOIN request";
log.error(errMsg, e);
@@ -136,7 +120,7 @@
}
return memListCmd;
} else if (msg instanceof MemberJoinedCommand) {
- log.info("Received MEMBER_JOINED message from " + TribesUtil.getHost(member));
+ log.info("Received MEMBER_JOINED message from " + TribesUtil.getHost(invoker));
try {
MemberJoinedCommand command = (MemberJoinedCommand) msg;
command.setMembershipManager(membershipManager);
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=659446&r1=659445&r2=659446&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Thu May 22 23:23:42 2008
@@ -34,6 +34,7 @@
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.clustering.control.wka.JoinGroupCommand;
+import org.apache.axis2.clustering.control.wka.MemberJoinedCommand;
import org.apache.axis2.clustering.control.wka.MemberListCommand;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.HandlerDescription;
@@ -168,7 +169,6 @@
membershipManager,
staticMembershipInterceptor);
rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
- rpcRequestHandler.setRpcChannel(rpcChannel);
log.info("Local Member " + TribesUtil.getLocalHost(channel));
@@ -183,10 +183,35 @@
Channel.SEND_OPTIONS_ASYNCHRONOUS,
10000);
if (responses.length > 0) {
+ Member source = responses[0].getSource();
MemberListCommand command = (MemberListCommand) responses[0].getMessage();
command.setMembershipManager(membershipManager);
command.setStaticMembershipInterceptor(staticMembershipInterceptor);
- command.execute(configurationContext); // Do the initialization
+ command.execute(configurationContext);
+
+ log.info("Sending MEMBER_JOINED to group...");
+ MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+ memberJoinedCommand.setMember(membershipManager.getLocalMember());
+ try {
+ Member[] currentMembers = membershipManager.getMembers();
+ Member[] sendTo = new Member[currentMembers.length - 1];
+ int j = 0;
+ for (Member currentMember : currentMembers) {
+ if (!currentMember.equals(source)) {
+ sendTo[j] = currentMember;
+ j++;
+ }
+ }
+ rpcChannel.send(sendTo,
+ memberJoinedCommand,
+ RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ } catch (ChannelException e) {
+ String msg = "Could not send MEMBER_JOINED message to group";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
+ }
}
} catch (ChannelException e) {
String msg = "Could not JOIN group";