You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/05/23 12:31:55 UTC
svn commit: r659492 - in /webservices/axis2/trunk/java/modules:
clustering/src/org/apache/axis2/clustering/tribes/
kernel/src/org/apache/axis2/clustering/
kernel/src/org/apache/axis2/deployment/
Author: azeez
Date: Fri May 23 03:31:54 2008
New Revision: 659492
URL: http://svn.apache.org/viewvc?rev=659492&view=rev
Log:
When a WKA member fails and comes back up, other members will have to send the MEMBER_LIST to it
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java
webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Fri May 23 03:31:54 2008
@@ -80,8 +80,7 @@
} catch (ChannelException e) {
log.error("Could not send message to some members", e);
ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
- for (int i = 0; i < faultyMembers.length; i++) {
- ChannelException.FaultyMember faultyMember = faultyMembers[i];
+ for (ChannelException.FaultyMember faultyMember : faultyMembers) {
Member member = faultyMember.getMember();
log.error("Member " + TribesUtil.getHost(member) + " is faulty",
faultyMember.getCause());
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Fri May 23 03:31:54 2008
@@ -19,8 +19,14 @@
package org.apache.axis2.clustering.tribes;
+import org.apache.axis2.clustering.control.wka.MemberListCommand;
+import org.apache.catalina.tribes.Channel;
import org.apache.catalina.tribes.Member;
+import org.apache.catalina.tribes.RemoteProcessException;
+import org.apache.catalina.tribes.group.RpcChannel;
import org.apache.catalina.tribes.membership.MemberImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
import java.util.List;
@@ -30,12 +36,30 @@
* Responsible for managing the membership. Handles membership changes.
*/
public class MembershipManager {
+
+ private static final Log log = LogFactory.getLog(MembershipManager.class);
+
+ private RpcChannel rpcChannel;
+
+ public MembershipManager() {
+ }
+
+ public void setRpcChannel(RpcChannel rpcChannel) {
+ this.rpcChannel = rpcChannel;
+ }
+
/**
- * List of members in the cluster
+ * List of current members in the cluster. Only the members who are alive will be in this
+ * list
*/
private final List<Member> members = new ArrayList<Member>();
/**
+ * List of Well-Known members. These members may or may not be alive at a given moment.
+ */
+ private List<Member> wkaMembers = new ArrayList<Member>();
+
+ /**
* The member representing this node
*/
private Member localMember;
@@ -48,6 +72,10 @@
this.localMember = localMember;
}
+ public void addWellKnownMember(Member wkaMember) {
+ wkaMembers.add(wkaMember);
+ }
+
/**
* A new member is added
*
@@ -56,6 +84,24 @@
*/
public synchronized boolean memberAdded(Member member) {
if (!members.contains(member)) {
+ if (rpcChannel != null && wkaMembers.contains(member)) { // if it is a well-known member
+
+ log.info("A WKA member " + TribesUtil.getHost(member) +
+ " just joined the group. Sending MEMBER_LIST message.");
+ // send the memeber list to it
+ MemberListCommand memListCmd;
+ try {
+ memListCmd = new MemberListCommand();
+ memListCmd.setMembers(getMembers());
+ rpcChannel.send(new Member[]{member}, memListCmd, RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
+ } catch (Exception e) {
+ String errMsg = "Could not send MEMBER_LIST to well-known member " +
+ TribesUtil.getHost(member);
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
+ }
members.add(member);
return true;
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/RpcRequestHandler.java Fri May 23 03:31:54 2008
@@ -49,8 +49,8 @@
private StaticMembershipInterceptor staticMembershipInterceptor;
public RpcRequestHandler(ConfigurationContext configurationContext,
- MembershipManager membershipManager,
- StaticMembershipInterceptor staticMembershipInterceptor) {
+ MembershipManager membershipManager,
+ StaticMembershipInterceptor staticMembershipInterceptor) {
this.configurationContext = configurationContext;
this.membershipManager = membershipManager;
this.staticMembershipInterceptor = staticMembershipInterceptor;
@@ -131,6 +131,19 @@
log.error(errMsg, e);
throw new RemoteProcessException(errMsg, e);
}
+ } else if (msg instanceof MemberListCommand) {
+ try { //TODO: What if we receive more than one member list message?
+ MemberListCommand command = (MemberListCommand) msg;
+ command.setMembershipManager(membershipManager);
+ command.setStaticMembershipInterceptor(staticMembershipInterceptor);
+ command.execute(configurationContext);
+
+ //TODO Send MEMBER_JOINED messages to all nodes
+ } catch (ClusteringFault e) {
+ String errMsg = "Cannot handle MEMBER_LIST message";
+ log.error(errMsg, e);
+ throw new RemoteProcessException(errMsg, e);
+ }
}
//TODO: If a WKA member fails, it shud figure out the membership. The WKA member write membership to a local file
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Fri May 23 03:31:54 2008
@@ -96,17 +96,17 @@
private MembershipManager membershipManager;
private RpcRequestHandler rpcRequestHandler;
private StaticMembershipInterceptor staticMembershipInterceptor;
- private org.apache.axis2.clustering.Member[] members;
+ private List<org.apache.axis2.clustering.Member> members;
public TribesClusterManager() {
parameters = new HashMap<String, Parameter>();
}
- public void setMembers(org.apache.axis2.clustering.Member[] members) {
+ public void setMembers(List<org.apache.axis2.clustering.Member> members) {
this.members = members;
}
- public org.apache.axis2.clustering.Member[] getMembers() {
+ public List<org.apache.axis2.clustering.Member> getMembers() {
return members;
}
@@ -140,8 +140,8 @@
addInterceptors(channel, domain, membershipScheme);
// Membership scheme handling
- //TODO: if it is a WKA scheme, connect to a WKA and get a list of members. Add the members
- // TODO: to the membership manager
+ // If it is a WKA scheme, connect to a WKA and get a list of members. Add the members
+ // to the membership manager
configureMembershipScheme(domain, membershipScheme);
channel.addChannelListener(channelListener);
@@ -169,6 +169,7 @@
membershipManager,
staticMembershipInterceptor);
rpcChannel = new RpcChannel(domain, channel, rpcRequestHandler);
+ membershipManager.setRpcChannel(rpcChannel);
log.info("Local Member " + TribesUtil.getLocalHost(channel));
@@ -197,16 +198,13 @@
Member[] sendTo = new Member[currentMembers.length - 1];
int j = 0;
for (Member currentMember : currentMembers) {
- if (!currentMember.equals(source)) {
+ if (!currentMember.equals(source)) { // Don't send back to the sender
sendTo[j] = currentMember;
j++;
}
}
- rpcChannel.send(sendTo,
- memberJoinedCommand,
- RpcChannel.ALL_REPLY,
- Channel.SEND_OPTIONS_ASYNCHRONOUS,
- 10000);
+ rpcChannel.send(sendTo, memberJoinedCommand, RpcChannel.ALL_REPLY,
+ Channel.SEND_OPTIONS_ASYNCHRONOUS, 10000);
} catch (ChannelException e) {
String msg = "Could not send MEMBER_JOINED message to group";
log.error(msg, e);
@@ -450,6 +448,7 @@
// We will add the member even if it is offline at this moment. When the
// member comes online, it will be detected by the GMS
staticMembershipInterceptor.addStaticMember(tribesMember);
+ membershipManager.addWellKnownMember(tribesMember);
if (canConnect(member)) {
membershipManager.memberAdded(tribesMember);
log.info("Added static member " + TribesUtil.getHost(tribesMember));
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Fri May 23 03:31:54 2008
@@ -24,8 +24,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.util.StringTokenizer;
-
public class TribesUtil {
private static Log log = LogFactory.getLog(TribesUtil.class);
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/ClusterManager.java Fri May 23 03:31:54 2008
@@ -24,6 +24,8 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.description.ParameterInclude;
+import java.util.List;
+
/**
* <p>
* This is the main interface in the Axis2 clustering implementation.
@@ -135,7 +137,7 @@
*
* @param members Members to be added
*/
- void setMembers(Member[] members);
+ void setMembers(List<Member> members);
/**
* Get the list of members in a
@@ -146,6 +148,6 @@
* @return The members if static group membership is used. If any other membership scheme is used,
* the values returned may not be valid
*/
- Member[] getMembers();
+ List<Member> getMembers();
}
\ No newline at end of file
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/clustering/Member.java Fri May 23 03:31:54 2008
@@ -42,4 +42,25 @@
public int getPort() {
return port;
}
+
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Member member = (Member) o;
+ return port == member.getPort() &&
+ !(hostName != null ? !hostName.equals(member.getHostName()) :
+ member.getHostName() != null);
+ }
+
+ public int hashCode() {
+ int result;
+ result = (hostName != null ? hostName.hashCode() : 0);
+ result = 31 * result + port;
+ return result;
+ }
}
Modified: webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java?rev=659492&r1=659491&r2=659492&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java (original)
+++ webservices/axis2/trunk/java/modules/kernel/src/org/apache/axis2/deployment/ClusterBuilder.java Fri May 23 03:31:54 2008
@@ -118,12 +118,12 @@
}
private void loadMembers(ClusterManager clusterManager, OMElement clusterElement) {
- clusterManager.setMembers(new Member[0]);
+ clusterManager.setMembers(new ArrayList<Member>());
Parameter membershipSchemeParam = clusterManager.getParameter("membershipScheme");
if (membershipSchemeParam != null) {
String membershipScheme = ((String) membershipSchemeParam.getValue()).trim();
if (membershipScheme.equals(ClusteringConstants.MembershipScheme.WKA_BASED)) {
- List members = new ArrayList();
+ List<Member> members = new ArrayList<Member>();
OMElement membersEle =
clusterElement.getFirstChildWithName(new QName("members"));
if (membersEle != null) {
@@ -137,7 +137,7 @@
members.add(new Member(hostName, Integer.parseInt(port)));
}
}
- clusterManager.setMembers((Member[]) members.toArray(new Member[members.size()]));
+ clusterManager.setMembers(members);
}
}
}