You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/07 15:15:09 UTC

svn commit: r609612 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes: ChannelListener.java TribesClusterManager.java TribesUtil.java

Author: azeez
Date: Mon Jan  7 06:15:04 2008
New Revision: 609612

URL: http://svn.apache.org/viewvc?rev=609612&view=rev
Log:
Handle duplicate message receipt


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=609612&r1=609611&r2=609612&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Mon Jan  7 06:15:04 2008
@@ -43,29 +43,48 @@
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 
 public class ChannelListener implements org.apache.catalina.tribes.ChannelListener {
     private static final Log log = LogFactory.getLog(ChannelListener.class);
 
+    /**
+     * The time a message lives in the receivedMessages Map
+     */
+    private static final int TIME_TO_LIVE = 5 * 60 * 1000; // 5 mins
+
     private DefaultContextManager contextManager;
     private DefaultConfigurationManager configurationManager;
     private TribesControlCommandProcessor controlCommandProcessor;
     private ChannelSender channelSender;
 
     private ConfigurationContext configurationContext;
+    private boolean synchronizeAllMembers;
+
+    private Map receivedMessages = new HashMap();
 
     public ChannelListener(ConfigurationContext configurationContext,
                            DefaultConfigurationManager configurationManager,
                            DefaultContextManager contextManager,
                            TribesControlCommandProcessor controlCommandProcessor,
-                           ChannelSender sender) {
+                           ChannelSender sender,
+                           boolean synchronizeAllMembers) {
         this.configurationManager = configurationManager;
         this.contextManager = contextManager;
         this.controlCommandProcessor = controlCommandProcessor;
         this.channelSender = sender;
         this.configurationContext = configurationContext;
+        this.synchronizeAllMembers = synchronizeAllMembers;
+
+        Timer cleanupTimer = new Timer();
+        cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
+                                         TIME_TO_LIVE,
+                                         TIME_TO_LIVE);
     }
 
     public void setContextManager(DefaultContextManager contextManager) {
@@ -104,9 +123,10 @@
             msg = XByteBuffer.deserialize(message,
                                           0,
                                           message.length,
-                                          (ClassLoader[])classLoaders.toArray(new ClassLoader[classLoaders.size()])); 
+                                          (ClassLoader[]) classLoaders.toArray(new ClassLoader[classLoaders.size()]));
         } catch (Exception e) {
-            log.error(e);
+            log.error("Cannot deserialize received message", e);
+            return;
         }
 
         // If the system has not still been intialized, reject all incoming messages, except the
@@ -128,27 +148,57 @@
     }
 
     private void processMessage(Serializable msg, Member sender) throws ClusteringFault {
-        //TODO: Reject duplicates that can be received due to retransmissions
-        //TODO: ACK implosion?
+        //TODO: Handle ACK implosion?
 
         if (msg instanceof ContextClusteringCommand && contextManager != null) {
             ContextClusteringCommand ctxCmd = (ContextClusteringCommand) msg;
+            String msgId = ctxCmd.getUniqueId();
+
+            // Check for duplicate messages and ignore duplicates in order to support at-most-once semantics
+            if (receivedMessages.containsKey(msgId)) {
+                log.debug("Received duplicate message " + ctxCmd);
+                receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was last received
+                return;
+            }
+            receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
+
+            // Process the message
             contextManager.process(ctxCmd);
 
             // Sending ACKs for ContextClusteringCommandCollection or
             // UpdateContextCommand is sufficient
-            if (msg instanceof ContextClusteringCommandCollection ||
-                msg instanceof UpdateContextCommand) {
-                AckCommand ackCmd = new AckCommand(ctxCmd.getUniqueId());
-
-                // Send the ACK
-                this.channelSender.sendToMember(ackCmd, sender);
+            if (synchronizeAllMembers) { // Send ACK only if the relevant cluster config parameter is set
+                if (msg instanceof ContextClusteringCommandCollection ||
+                    msg instanceof UpdateContextCommand) {
+                    AckCommand ackCmd = new AckCommand(msgId);
+
+                    // Send the ACK
+                    this.channelSender.sendToMember(ackCmd, sender);
+                }
             }
         } else if (msg instanceof ConfigurationClusteringCommand &&
                    configurationManager != null) {
             configurationManager.process((ConfigurationClusteringCommand) msg);
         } else if (msg instanceof ControlCommand && controlCommandProcessor != null) {
             controlCommandProcessor.process((ControlCommand) msg, sender);
+        }
+    }
+
+    private class ReceivedMessageCleanupTask extends TimerTask {
+
+        public void run() {
+            List toBeRemoved = new ArrayList();
+            for (Iterator iterator = receivedMessages.keySet().iterator(); iterator.hasNext();) {
+                String msgId = (String) iterator.next();
+                Long recdTime = (Long) receivedMessages.get(msgId);
+                if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
+                    toBeRemoved.add(msgId);
+                }
+            }
+            for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
+                String msgId = (String) iterator.next();
+                receivedMessages.remove(msgId);
+            }
         }
     }
 }

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java?rev=609612&r1=609611&r2=609612&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesClusterManager.java Mon Jan  7 06:15:04 2008
@@ -128,7 +128,8 @@
                                               configurationManager,
                                               contextManager,
                                               controlCmdProcessor,
-                                              sender);
+                                              sender,
+                                              synchronizeAllMembers());
 
         controlCmdProcessor.setChannelSender(sender);
         channel = new GroupChannel();
@@ -202,13 +203,13 @@
         log.info("Local Tribes Member " + TribesUtil.getLocalHost(channel));
         TribesUtil.printMembers(members);
 
-        // If configuration management is enabled, get the latest config from a neighbour
+        // If configuration management is enabled, get the latest config from a neighbour  TODO: from the longest living neighbour
         if (configurationManager != null) {
             configurationManager.setSender(sender);
             getInitializationMessage(members, sender, new GetConfigurationCommand());
         }
 
-        // If context replication is enabled, get the latest state from a neighbour
+        // If context replication is enabled, get the latest state from a neighbour  TODO: from the longest living neighbour
         if (contextManager != null) {
             contextManager.setSender(sender);
             channelListener.setContextManager(contextManager);
@@ -233,7 +234,7 @@
                                           ClusteringCommand command) {
         // If there is at least one member in the Tribe, get the current initialization info from a member
         Random random = new Random();
-        int numberOfTries = 0; // Don't keep on trying infinitely
+        int numberOfTries = 0; // Don't keep on trying indefinitely
 
         // Keep track of members to whom we already sent an initialization command
         // Do not send another request to these members
@@ -246,6 +247,10 @@
             // While there are members and GetStateResponseCommand is not received do the following
             try {
                 members = channel.getMembers();
+
+                //TODO: Can get longest alive member, willdo with membership awareness
+                members[0].getMemberAliveTime();
+
                 int memberIndex = random.nextInt(members.length);
                 Member member = members[memberIndex];
                 if (!sentMembersList.contains(TribesUtil.getHost(member))) {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java?rev=609612&r1=609611&r2=609612&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/TribesUtil.java Mon Jan  7 06:15:04 2008
@@ -57,4 +57,19 @@
     public static String getLocalHost(Channel channel) {
         return getHost(channel.getLocalMember(true));
     }
+
+    public static Member getLongestAliveMember(Member[] members) {
+        Member longestAliveMember = null;
+        if (members.length > 0) {
+            long longestAliveTime = members[0].getMemberAliveTime();
+            for (int i = 0; i < members.length; i++) {
+                Member member = members[i];
+                if (longestAliveTime < member.getMemberAliveTime()) {
+                    longestAliveTime = member.getMemberAliveTime();
+                    longestAliveMember = member;
+                }
+            }
+        }
+        return longestAliveMember;
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org