You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/01/07 17:06:53 UTC
svn commit: r609656 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes:
AckManager.java ChannelSender.java MembershipManager.java
TribesClusterManager.java TribesMembershipListener.java TribesUtil.java
Author: azeez
Date: Mon Jan 7 08:06:51 2008
New Revision: 609656
URL: http://svn.apache.org/viewvc?rev=609656&view=rev
Log:
1. More improvements to membership management
2. First try to get the config & state information from the longest living member of the group, when a node comes up
Added:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AckManager.java Mon Jan 7 08:06:51 2008
@@ -45,7 +45,7 @@
* When a particular member send an ACK for a particular message, the ACK is stored here
*
* @param messageUniqueId ID of the message being ACKed
- * @param memberId The ID of the member who ACKed the above message
+ * @param memberId The ID of the member who ACKed the above message
*/
public static void addAcknowledgement(String messageUniqueId,
String memberId) {
@@ -67,9 +67,9 @@
* and then return false.
*
* @param messageUniqueId ID of the message being ACKed
- * @param sender
+ * @param sender The utility for sending the message
* @return true - if all members have ACKed the message, false - otherwise
- * @throws ClusteringFault
+ * @throws ClusteringFault If an error occurs while retrannsmitting a message
*/
public static boolean isMessageAcknowledged(String messageUniqueId,
ChannelSender sender) throws ClusteringFault {
@@ -86,7 +86,7 @@
// Check that all members in the memberList are same as the total member list,
// which will indicate that all members have ACKed the message
- Member[] members = sender.getChannel().getMembers();
+ Member[] members = MembershipManager.getMembers();
if (members.length == 0) {
isAcknowledged = true;
} else {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jan 7 08:06:51 2008
@@ -45,67 +45,46 @@
}
long timeToSend = 0;
- // Keep retrying, since at the point of trying to send the msg, a member may leave the group
- // causing a view change. All nodes in a view should get the msg
- //TODO: Sometimes Tribes incorrectly detects that a member has left a group
- while (true) {
- if (channel.getMembers().length > 0) {
- try {
- long start = System.currentTimeMillis();
- channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
- timeToSend = System.currentTimeMillis() - start;
- log.debug("Sent " + msg + " to group");
- break;
- } catch (NotSerializableException e) {
- String message = "Could not send command message " + msg +
- " to group since it is not serializable.";
- log.error(message, e);
- throw new ClusteringFault(message, e);
- } catch (Exception e) {
- String message = "Error sending command message : " + msg +
- ". Reason " + e.getMessage();
- log.warn(message, e);
- }
- } else {
- break;
- }
- }
- return timeToSend;
- }
-
- public long sendToGroup(ClusteringCommand msg, Member[] members) throws ClusteringFault {
- if (channel == null) {
- return 0;
- }
- long timeToSend = 0;
+ Member[] members = MembershipManager.getMembers();
// Keep retrying, since at the point of trying to send the msg, a member may leave the group
// causing a view change. All nodes in a view should get the msg
//TODO: Sometimes Tribes incorrectly detects that a member has left a group
- while (true) {
- if (channel.getMembers().length > 0) {
- try {
- long start = System.currentTimeMillis();
- channel.send(channel.getMembers(), toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
- timeToSend = System.currentTimeMillis() - start;
- log.debug("Sent " + msg + " to group");
- break;
- } catch (NotSerializableException e) {
- String message = "Could not send command message " + msg +
- " to group since it is not serializable.";
- log.error(message, e);
- throw new ClusteringFault(message, e);
- } catch (ChannelException e) {
-
- } catch (Exception e) {
- String message = "Error sending command message : " + msg +
- ". Reason " + e.getMessage();
- log.warn(message, e);
+// while (true) {
+ if (members.length > 0) {
+ try {
+ long start = System.currentTimeMillis();
+ channel.send(members, toByteMessage(msg), Channel.SEND_OPTIONS_USE_ACK);
+ timeToSend = System.currentTimeMillis() - start;
+ log.debug("Sent " + msg + " to group");
+// break;
+ } catch (NotSerializableException e) {
+ String message = "Could not send command message " + msg +
+ " to group since it is not serializable.";
+ log.error(message, e);
+ throw new ClusteringFault(message, e);
+ } catch (ChannelException e) {
+ //TODO: What to do for faulty members
+ ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+ for (int i = 0; i < faultyMembers.length; i++) {
+ ChannelException.FaultyMember faultyMember = faultyMembers[i];
+ Member member = faultyMember.getMember();
+ log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
+ faultyMember.getCause(), faultyMember.getCause());
+
+ //TODO: Shall we try to resend to these members?
}
- } else {
- break;
+
+ } catch (Exception e) {
+ String message = "Error sending command message : " + msg +
+ ". Reason " + e.getMessage();
+ log.warn(message, e);
}
}
+// else {
+// break;
+// }
+// }
return timeToSend;
}
@@ -146,6 +125,10 @@
" since it is not serializable.";
log.error(message, e);
throw new ClusteringFault(message, e);
+ } catch (ChannelException e) {
+ ChannelException.FaultyMember[] faultyMembers = e.getFaultyMembers();
+ log.error("Member " + TribesUtil.getHost(member) + " is faulty. Cause " +
+ faultyMembers[0].getCause(), faultyMembers[0].getCause());
} catch (Exception e) {
String message = "Could not send message to " + TribesUtil.getHost(member) +
". Reason " + e.getMessage();
Added: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java?rev=609656&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java (added)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/MembershipManager.java Mon Jan 7 08:06:51 2008
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.axis2.clustering.tribes;
+
+import org.apache.catalina.tribes.Member;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Responsible for managing the membership
+ */
+public class MembershipManager {
+ private static final List members = new ArrayList();
+
+ public synchronized static void memberAdded(Member member) {
+ members.add(member);
+ }
+
+ public synchronized static void memberDisappeared(Member member) {
+ members.remove(member);
+ }
+
+ public synchronized static Member[] getMembers() {
+ return (Member[]) members.toArray(new Member[members.size()]);
+ }
+
+ public synchronized static Member getLongestAliveMember() {
+ Member longestAliveMember = null;
+ if (members.size() > 0) {
+ long longestAliveTime = ((Member) members.get(0)).getMemberAliveTime();
+ for (int i = 0; i < members.size(); i++) {
+ Member member = (Member) members.get(i);
+ if (longestAliveTime < member.getMemberAliveTime()) {
+ longestAliveTime = member.getMemberAliveTime();
+ longestAliveMember = member;
+ }
+ }
+ }
+ return longestAliveMember;
+ }
+
+ public synchronized static Member getRandomMember() {
+ if(members.size() == 0){
+ return null;
+ }
+ int memberIndex = new Random().nextInt(members.size());
+ return (Member) members.get(memberIndex);
+ }
+
+}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan 7 08:06:51 2008
@@ -22,7 +22,6 @@
import org.apache.axiom.om.OMElement;
import org.apache.axis2.AxisFault;
import org.apache.axis2.clustering.ClusterManager;
-import org.apache.axis2.clustering.ClusteringCommand;
import org.apache.axis2.clustering.ClusteringConstants;
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.RequestBlockingHandler;
@@ -31,6 +30,7 @@
import org.apache.axis2.clustering.context.ClusteringContextListener;
import org.apache.axis2.clustering.context.ContextManager;
import org.apache.axis2.clustering.context.DefaultContextManager;
+import org.apache.axis2.clustering.control.ControlCommand;
import org.apache.axis2.clustering.control.GetConfigurationCommand;
import org.apache.axis2.clustering.control.GetStateCommand;
import org.apache.axis2.context.ConfigurationContext;
@@ -55,7 +55,6 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Random;
public class TribesClusterManager implements ClusterManager {
private static final Log log = LogFactory.getLog(TribesClusterManager.class);
@@ -199,21 +198,21 @@
}
sender.setChannel(channel);
- Member[] members = channel.getMembers();
+// Member[] members = channel.getMembers();
log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
- TribesUtil.printMembers(members);
+ TribesUtil.printMembers();
// If configuration management is enabled, get the latest config from a neighbour TODO: from the longest living neighbour
if (configurationManager != null) {
configurationManager.setSender(sender);
- getInitializationMessage(members, sender, new GetConfigurationCommand());
+ getInitializationMessage(sender, new GetConfigurationCommand());
}
// If context replication is enabled, get the latest state from a neighbour TODO: from the longest living neighbour
if (contextManager != null) {
contextManager.setSender(sender);
channelListener.setContextManager(contextManager);
- getInitializationMessage(members, sender, new GetStateCommand());
+ getInitializationMessage(sender, new GetStateCommand());
ClusteringContextListener contextListener = new ClusteringContextListener(sender);
configurationContext.addContextListener(contextListener);
}
@@ -225,20 +224,18 @@
* Get some information from a neighbour. This information will be used by this node to
* initialize itself
*
- * @param members
- * @param sender
- * @param command
+ * @param sender The utility for sending messages to the channel
+ * @param command The control command to send
*/
- private void getInitializationMessage(Member[] members,
- ChannelSender sender,
- ClusteringCommand command) {
- // If there is at least one member in the Tribe, get the current initialization info from a member
- Random random = new Random();
+ private void getInitializationMessage(ChannelSender sender, ControlCommand command) {
+ // If there is at least one member in the cluster,
+ // get the current initialization info from a member
int numberOfTries = 0; // Don't keep on trying indefinitely
// Keep track of members to whom we already sent an initialization command
// Do not send another request to these members
List sentMembersList = new ArrayList();
+ Member[] members = MembershipManager.getMembers();
while (members.length > 0 &&
configurationContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null
@@ -246,13 +243,9 @@
// While there are members and GetStateResponseCommand is not received do the following
try {
- members = channel.getMembers();
-
- //TODO: Can get longest alive member, willdo with membership awareness
- members[0].getMemberAliveTime();
-
- int memberIndex = random.nextInt(members.length);
- Member member = members[memberIndex];
+ Member member = (numberOfTries == 0) ?
+ MembershipManager.getLongestAliveMember() : // First try to get from the longest alive member
+ MembershipManager.getRandomMember(); // Else get from a random member
if (!sentMembersList.contains(TribesUtil.getHost(member))) {
long tts = sender.sendToMember(command, member);
configurationContext.
@@ -260,12 +253,13 @@
new Long(tts));
sentMembersList.add(TribesUtil.getHost(member));
log.debug("WAITING FOR STATE INITIALIZATION MESSAGE...");
- Thread.sleep(tts + 5);
+ Thread.sleep(tts + 5 * (numberOfTries + 1));
}
} catch (Exception e) {
log.error(e);
break;
}
+ members = MembershipManager.getMembers();
numberOfTries++;
}
}
@@ -334,9 +328,6 @@
public boolean synchronizeAllMembers() {
Parameter syncAllParam = getParameter(ClusteringConstants.SYNCHRONIZE_ALL_MEMBERS);
- if (syncAllParam == null) {
- return true;
- }
- return Boolean.parseBoolean((String) syncAllParam.getValue());
+ return syncAllParam == null || Boolean.parseBoolean((String) syncAllParam.getValue());
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesMembershipListener.java Mon Jan 7 08:06:51 2008
@@ -32,11 +32,14 @@
public void memberAdded(Member member) {
log.info("New member " + TribesUtil.getHost(member) + " joined cluster.");
+ MembershipManager.memberAdded(member);
// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
}
public void memberDisappeared(Member member) {
log.info("Member " + TribesUtil.getHost(member) + " left cluster");
+ MembershipManager.memberDisappeared(member);
+
// System.err.println("++++++ IS COORD="+TribesClusterManager.nbc.isCoordinator());
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=609656&r1=609655&r2=609656&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan 7 08:06:51 2008
@@ -28,7 +28,8 @@
private static Log log = LogFactory.getLog(TribesUtil.class);
- public static void printMembers(Member[] members) {
+ public static void printMembers() {
+ Member[] members = MembershipManager.getMembers();
if (members != null) {
int length = members.length;
if (length > 0) {
@@ -56,20 +57,5 @@
public static String getLocalHost(Channel channel) {
return getHost(channel.getLocalMember(true));
- }
-
- public static Member getLongestAliveMember(Member[] members) {
- Member longestAliveMember = null;
- if (members.length > 0) {
- long longestAliveTime = members[0].getMemberAliveTime();
- for (int i = 0; i < members.length; i++) {
- Member member = members[i];
- if (longestAliveTime < member.getMemberAliveTime()) {
- longestAliveTime = member.getMemberAliveTime();
- longestAliveMember = member;
- }
- }
- }
- return longestAliveMember;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org