You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/09 08:06:17 UTC

svn commit: r610282 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering: control/GetConfigurationResponseCommand.java control/GetStateResponseCommand.java tribes/ChannelListener.java

Author: azeez
Date: Tue Jan  8 23:06:15 2008
New Revision: 610282

URL: http://svn.apache.org/viewvc?rev=610282&view=rev
Log:
 Improvements to duplicate message handling (to support at-most-once message processing semantics)


Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java?rev=610282&r1=610281&r2=610282&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java Tue Jan  8 23:06:15 2008
@@ -24,6 +24,8 @@
 import org.apache.axis2.description.AxisService;
 import org.apache.axis2.description.AxisModule;
 import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 import java.io.FileNotFoundException;
 import java.util.Iterator;
@@ -33,6 +35,8 @@
  */
 public class GetConfigurationResponseCommand extends ControlCommand {
 
+    private static final Log log = LogFactory.getLog(GetConfigurationResponseCommand.class);
+
     private String[] serviceGroups;
 
     public void execute(ConfigurationContext configContext) throws ClusteringFault {
@@ -41,6 +45,7 @@
         // Run this code only if this node is not already initialized
         if (configContext.
                 getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+            log.info("Received configuration initialization message");
             if (serviceGroups != null) {
 
                 // Load all the service groups that are sent by the neighbour
@@ -58,6 +63,8 @@
                         }
                     }
                 }
+
+                //TODO: Check this code. Need to see what happens to Data Services etc. also services deployed from within modules
 
                 // Unload all service groups which were not sent by the neighbour,
                 // but have been currently loaded

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=610282&r1=610281&r2=610282&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Tue Jan  8 23:06:15 2008
@@ -22,12 +22,16 @@
 import org.apache.axis2.clustering.ClusteringFault;
 import org.apache.axis2.clustering.context.ContextClusteringCommand;
 import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * 
  */
 public class GetStateResponseCommand extends ControlCommand {
 
+    private static final Log log = LogFactory.getLog(GetStateResponseCommand.class);
+
     private ContextClusteringCommand[] commands;
 
     public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
@@ -35,6 +39,7 @@
         // Run this code only if this node is not already initialized
         if (configurationContext.
                 getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+            log.info("Received state initialization message");
             configurationContext.
                     setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
             if (commands != null) {

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=610282&r1=610281&r2=610282&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Tue Jan  8 23:06:15 2008
@@ -81,10 +81,12 @@
         this.configurationContext = configurationContext;
         this.synchronizeAllMembers = synchronizeAllMembers;
 
-        Timer cleanupTimer = new Timer();
-        cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
-                                         TIME_TO_LIVE,
-                                         TIME_TO_LIVE);
+        if (synchronizeAllMembers) {
+            Timer cleanupTimer = new Timer();
+            cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
+                                             TIME_TO_LIVE,
+                                             TIME_TO_LIVE);
+        }
     }
 
     public void setContextManager(DefaultContextManager contextManager) {
@@ -157,14 +159,17 @@
             String msgId = ctxCmd.getUniqueId();
 
             // Check for duplicate messages and ignore duplicates in order to support at-most-once semantics
-            if (receivedMessages.containsKey(msgId)) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Received duplicate message " + ctxCmd);
+            if (synchronizeAllMembers) { // Duplicates can be received only if an ACK & retransmit mechanism is used
+                if (receivedMessages.containsKey(msgId)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Received duplicate message " + ctxCmd);
+                    }
+                    return;
+                }
+                synchronized (receivedMessages) {
+                    receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
                 }
-                receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was last received
-                return;
             }
-            receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
 
             // Process the message
             contextManager.process(ctxCmd);
@@ -192,18 +197,20 @@
 
         public void run() {
             List toBeRemoved = new ArrayList();
-            for (Iterator iterator = receivedMessages.keySet().iterator(); iterator.hasNext();) {
-                String msgId = (String) iterator.next();
-                Long recdTime = (Long) receivedMessages.get(msgId);
-                if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
-                    toBeRemoved.add(msgId);
+            synchronized (receivedMessages) {
+                for (Iterator iter = receivedMessages.keySet().iterator(); iter.hasNext();) {
+                    String msgId = (String) iter.next();
+                    Long recdTime = (Long) receivedMessages.get(msgId);
+                    if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
+                        toBeRemoved.add(msgId);
+                    }
                 }
-            }
-            for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
-                String msgId = (String) iterator.next();
-                receivedMessages.remove(msgId);
-                if (log.isDebugEnabled()) {
-                    log.debug("Removed message " + msgId + " from received message buffer");
+                for (Iterator iter = toBeRemoved.iterator(); iter.hasNext();) {
+                    String msgId = (String) iter.next();
+                    receivedMessages.remove(msgId);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Removed message " + msgId + " from received message buffer");
+                    }
                 }
             }
         }



---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org