You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by az...@apache.org on 2008/01/07 15:15:09 UTC
svn commit: r609612 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes:
ChannelListener.java TribesClusterManager.java TribesUtil.java
Author: azeez
Date: Mon Jan 7 06:15:04 2008
New Revision: 609612
URL: http://svn.apache.org/viewvc?rev=609612&view=rev
Log:
Handle duplicate message receipt
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=609612&r1=609611&r2=609612&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Mon Jan 7 06:15:04 2008
@@ -43,29 +43,48 @@
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
private static final Log log = LogFactory.getLog(ChannelListener.class);
+ /**
+ * The time a message lives in the receivedMessages Map
+ */
+ private static final int TIME_TO_LIVE = 5 * 60 * 1000; // 5 mins
+
private DefaultContextManager contextManager;
private DefaultConfigurationManager configurationManager;
private TribesControlCommandProcessor controlCommandProcessor;
private ChannelSender channelSender;
private ConfigurationContext configurationContext;
+ private boolean synchronizeAllMembers;
+
+ private Map receivedMessages = new HashMap();
public ChannelListener(ConfigurationContext configurationContext,
DefaultConfigurationManager configurationManager,
DefaultContextManager contextManager,
TribesControlCommandProcessor controlCommandProcessor,
- ChannelSender sender) {
+ ChannelSender sender,
+ boolean synchronizeAllMembers) {
this.configurationManager = configurationManager;
this.contextManager = contextManager;
this.controlCommandProcessor = controlCommandProcessor;
this.channelSender = sender;
this.configurationContext = configurationContext;
+ this.synchronizeAllMembers = synchronizeAllMembers;
+
+ Timer cleanupTimer = new Timer();
+ cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
+ TIME_TO_LIVE,
+ TIME_TO_LIVE);
}
public void setContextManager(DefaultContextManager contextManager) {
@@ -104,9 +123,10 @@
msg = XByteBuffer.deserialize(message,
0,
message.length,
- (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()]));
+ (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
} catch (Exception e) {
- log.error(e);
+ log.error("Cannot deserialize received message", e);
+ return;
}
// If the system has not still been intialized, reject all incoming messages, except the
@@ -128,27 +148,57 @@
}
private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
- //TODO: Reject duplicates that can be received due to retransmissions
- //TODO: ACK implosion?
+ //TODO: Handle ACK implosion?
if (msg instanceof ContextClusteringCommand && contextManager != null) {
ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
+ String msgId = ctxCmd.getUniqueId();
+
+ // Check for duplicate messages and ignore duplicates in order to support at-most-once semantics
+ if (receivedMessages.containsKey(msgId)) {
+ log.debug("Received duplicate message " + ctxCmd);
+ receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was last received
+ return;
+ }
+ receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
+
+ // Process the message
contextManager.process(ctxCmd);
// Sending ACKs for ContextClusteringCommandCollection or
// UpdateContextCommand is sufficient
- if (msg instanceof ContextClusteringCommandCollection ||
- msg instanceof UpdateContextCommand) {
- AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
-
- // Send the ACK
- this.channelSender.sendToMember(ackCmd, sender);
+ if (synchronizeAllMembers) { // Send ACK only if the relevant cluster config parameter is set
+ if (msg instanceof ContextClusteringCommandCollection ||
+ msg instanceof UpdateContextCommand) {
+ AckCommand ackCmd = new AckCommand(msgId);
+
+ // Send the ACK
+ this.channelSender.sendToMember(ackCmd, sender);
+ }
}
} else if (msg instanceof ConfigurationClusteringCommand &&
configurationManager != null) {
configurationManager.process((ConfigurationClusteringCommand) msg);
} else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
controlCommandProcessor.process((ControlCommand) msg, sender);
+ }
+ }
+
+ private class ReceivedMessageCleanupTask extends TimerTask {
+
+ public void run() {
+ List toBeRemoved = new ArrayList();
+ for (Iterator iterator = receivedMessages.keySet().iterator(); iterator.hasNext();) {
+ String msgId = (String) iterator.next();
+ Long recdTime = (Long) receivedMessages.get(msgId);
+ if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
+ toBeRemoved.add(msgId);
+ }
+ }
+ for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+ String msgId = (String) iterator.next();
+ receivedMessages.remove(msgId);
+ }
}
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=609612&r1=609611&r2=609612&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan 7 06:15:04 2008
@@ -128,7 +128,8 @@
configurationManager,
contextManager,
controlCmdProcessor,
- sender);
+ sender,
+ synchronizeAllMembers());
controlCmdProcessor.setChannelSender(sender);
channel = new GroupChannel();
@@ -202,13 +203,13 @@
log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
TribesUtil.printMembers(members);
- // If configuration management is enabled, get the latest config from a neighbour
+ // If configuration management is enabled, get the latest config from a neighbour TODO: from the longest living neighbour
if (configurationManager != null) {
configurationManager.setSender(sender);
getInitializationMessage(members, sender, new GetConfigurationCommand());
}
- // If context replication is enabled, get the latest state from a neighbour
+ // If context replication is enabled, get the latest state from a neighbour TODO: from the longest living neighbour
if (contextManager != null) {
contextManager.setSender(sender);
channelListener.setContextManager(contextManager);
@@ -233,7 +234,7 @@
ClusteringCommand command) {
// If there is at least one member in the Tribe, get the current initialization info from a member
Random random = new Random();
- int numberOfTries = 0; // Don't keep on trying infinitely
+ int numberOfTries = 0; // Don't keep on trying indefinitely
// Keep track of members to whom we already sent an initialization command
// Do not send another request to these members
@@ -246,6 +247,10 @@
// While there are members and GetStateResponseCommand is not received do the following
try {
members = channel.getMembers();
+
+ //TODO: Can get longest alive member, willdo with membership awareness
+ members[0].getMemberAliveTime();
+
int memberIndex = random.nextInt(members.length);
Member member = members[memberIndex];
if (!sentMembersList.contains(TribesUtil.getHost(member))) {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=609612&r1=609611&r2=609612&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan 7 06:15:04 2008
@@ -57,4 +57,19 @@
public static String getLocalHost(Channel channel) {
return getHost(channel.getLocalMember(true));
}
+
+ public static Member getLongestAliveMember(Member[] members) {
+ Member longestAliveMember = null;
+ if (members.length > 0) {
+ long longestAliveTime = members[0].getMemberAliveTime();
+ for (int i = 0; i < members.length; i++) {
+ Member member = members[i];
+ if (longestAliveTime < member.getMemberAliveTime()) {
+ longestAliveTime = member.getMemberAliveTime();
+ longestAliveMember = member;
+ }
+ }
+ }
+ return longestAliveMember;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org