You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/25 06:59:16 UTC
svn commit: r659911 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes:
MembershipManager.java RpcRequestHandler.java TribesClusterManager.java
Author: azeez
Date: Sat May 24 21:59:15 2008
New Revision: 659911
URL: http://svn.apache.org/viewvc?rev=659911&view=rev
Log:
When a response is received from a well-known member, we will the receiver will replace the details of the WKA member obtained from the configuration file with those sent by the WKA member
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=659911&r1=659910&r2=659911&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Sat May 24 21:59:15 2008
@@ -76,6 +76,10 @@
wkaMembers.add(wkaMember);
}
+ public void removeWellKnownMember(Member wkaMember) {
+ wkaMembers.remove(wkaMember);
+ }
+
/**
* A new member is added
*
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=659911&r1=659910&r2=659911&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Sat May 24 21:59:15 2008
@@ -145,9 +145,7 @@
throw new RemoteProcessException(errMsg, e);
}
}
-
- //TODO: If a WKA member fails, it shud figure out the membership. The WKA member write membership to a local file
- return null;
+ return null;
}
public void leftOver(Serializable msg, Member member) {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=659911&r1=659910&r2=659911&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Sat May 24 21:59:15 2008
@@ -178,37 +178,54 @@
// If a WKA scheme is used, JOIN the group and get the member list
if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
try {
- Response[] responses = rpcChannel.send(membershipManager.getMembers(),
- new JoinGroupCommand(),
- RpcChannel.FIRST_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
- if (responses.length > 0) {
- Member source = responses[0].getSource();
- MemberListCommand command = (MemberListCommand) responses[0].getMessage();
- command.setMembershipManager(membershipManager);
- command.setStaticMembershipInterceptor(staticMembershipInterceptor);
- command.execute(configurationContext);
-
- log.info("Sending MEMBER_JOINED to group...");
- MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
- memberJoinedCommand.setMember(membershipManager.getLocalMember());
- try {
- Member[] currentMembers = membershipManager.getMembers();
- Member[] sendTo = new Member[currentMembers.length - 1];
- int j = 0;
- for (Member currentMember : currentMembers) {
- if (!currentMember.equals(source)) { // Don't send back to the sender
- sendTo[j] = currentMember;
- j++;
+ Member[] wkaMembers = membershipManager.getMembers(); // The well-known members
+ for (Member wkaMember : wkaMembers) {
+ Response[] responses = rpcChannel.send(new Member[]{wkaMember},
+ new JoinGroupCommand(),
+ RpcChannel.FIRST_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS,
+ 10000);
+ if (responses.length > 0) {
+ Member source = responses[0].getSource();
+ MemberListCommand command = (MemberListCommand) responses[0].getMessage();
+ command.setMembershipManager(membershipManager);
+ command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ command.execute(configurationContext);
+
+ // Replace the WKA member, with the details received from that member
+ // This is because we may have used a public IP address to point to the
+ // wka member. But subequently, we need to use the private IP address
+ StaticMember member = new StaticMember();
+ member.setHost(source.getHost());
+ member.setPort(source.getPort());
+ member.setMemberAliveTime(source.getMemberAliveTime());
+ member.setPayload("ping".getBytes());
+ membershipManager.removeWellKnownMember(wkaMember);
+ staticMembershipInterceptor.removeStaticMember(wkaMember);
+ membershipManager.addWellKnownMember(member);
+ membershipManager.memberAdded(member);
+ staticMembershipInterceptor.memberAdded(member);
+
+ log.info("Sending MEMBER_JOINED to group...");
+ MemberJoinedCommand memberJoinedCommand = new MemberJoinedCommand();
+ memberJoinedCommand.setMember(membershipManager.getLocalMember());
+ try {
+ Member[] currentMembers = membershipManager.getMembers();
+ Member[] sendTo = new Member[currentMembers.length - 1];
+ int j = 0;
+ for (Member currentMember : currentMembers) {
+ if (!currentMember.equals(source)) { // Don't send back to the sender
+ sendTo[j] = currentMember;
+ j++;
+ }
}
+ rpcChannel.send(sendTo, memberJoinedCommand, RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+ } catch (ChannelException e) {
+ String msg = "Could not send MEMBER_JOINED message to group";
+ log.error(msg, e);
+ throw new ClusteringFault(msg, e);
}
- rpcChannel.send(sendTo, memberJoinedCommand, RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
- } catch (ChannelException e) {
- String msg = "Could not send MEMBER_JOINED message to group";
- log.error(msg, e);
- throw new ClusteringFault(msg, e);
}
}
} catch (ChannelException e) {