You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/01/09 08:06:17 UTC
svn commit: r610282 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering:
control/GetConfigurationResponseCommand.java
control/GetStateResponseCommand.java tribes/ChannelListener.java
Author: azeez
Date: Tue Jan 8 23:06:15 2008
New Revision: 610282
URL: http://svn.apache.org/viewvc?rev=610282&view=rev
Log:
Improvements to duplicate message handling (to support at-most-once message processing semantics)
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java?rev=610282&r1=610281&r2=610282&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetConfigurationResponseCommand.java Tue Jan 8 23:06:15 2008
@@ -24,6 +24,8 @@
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisModule;
import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import java.io.FileNotFoundException;
import java.util.Iterator;
@@ -33,6 +35,8 @@
*/
public class GetConfigurationResponseCommand extends ControlCommand {
+ private static final Log log = LogFactory.getLog(GetConfigurationResponseCommand.class);
+
private String[] serviceGroups;
public void execute(ConfigurationContext configContext) throws ClusteringFault {
@@ -41,6 +45,7 @@
// Run this code only if this node is not already initialized
if (configContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ log.info("Received configuration initialization message");
if (serviceGroups != null) {
// Load all the service groups that are sent by the neighbour
@@ -58,6 +63,8 @@
}
}
}
+
+ //TODO: Check this code. Need to see what happens to Data Services etc. also services deployed from within modules
// Unload all service groups which were not sent by the neighbour,
// but have been currently loaded
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java?rev=610282&r1=610281&r2=610282&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/control/GetStateResponseCommand.java Tue Jan 8 23:06:15 2008
@@ -22,12 +22,16 @@
import org.apache.axis2.clustering.ClusteringFault;
import org.apache.axis2.clustering.context.ContextClusteringCommand;
import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
*
*/
public class GetStateResponseCommand extends ControlCommand {
+ private static final Log log = LogFactory.getLog(GetStateResponseCommand.class);
+
private ContextClusteringCommand[] commands;
public void execute(ConfigurationContext configurationContext) throws ClusteringFault {
@@ -35,6 +39,7 @@
// Run this code only if this node is not already initialized
if (configurationContext.
getPropertyNonReplicable(ClusteringConstants.CLUSTER_INITIALIZED) == null) {
+ log.info("Received state initialization message");
configurationContext.
setNonReplicableProperty(ClusteringConstants.CLUSTER_INITIALIZED, "true");
if (commands != null) {
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java?rev=610282&r1=610281&r2=610282&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelListener.java Tue Jan 8 23:06:15 2008
@@ -81,10 +81,12 @@
this.configurationContext = configurationContext;
this.synchronizeAllMembers = synchronizeAllMembers;
- Timer cleanupTimer = new Timer();
- cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
- TIME_TO_LIVE,
- TIME_TO_LIVE);
+ if (synchronizeAllMembers) {
+ Timer cleanupTimer = new Timer();
+ cleanupTimer.scheduleAtFixedRate(new ReceivedMessageCleanupTask(),
+ TIME_TO_LIVE,
+ TIME_TO_LIVE);
+ }
}
public void setContextManager(DefaultContextManager contextManager) {
@@ -157,14 +159,17 @@
String msgId = ctxCmd.getUniqueId();
// Check for duplicate messages and ignore duplicates in order to support at-most-once semantics
- if (receivedMessages.containsKey(msgId)) {
- if (log.isDebugEnabled()) {
- log.debug("Received duplicate message " + ctxCmd);
+ if (synchronizeAllMembers) { // Duplicates can be received only if an ACK & retransmit mechanism is used
+ if (receivedMessages.containsKey(msgId)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received duplicate message " + ctxCmd);
+ }
+ return;
+ }
+ synchronized (receivedMessages) {
+ receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
}
- receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was last received
- return;
}
- receivedMessages.put(msgId, new Long(System.currentTimeMillis()));// Let's keep track of the message as well as the time at which it was first received
// Process the message
contextManager.process(ctxCmd);
@@ -192,18 +197,20 @@
public void run() {
List toBeRemoved = new ArrayList();
- for (Iterator iterator = receivedMessages.keySet().iterator(); iterator.hasNext();) {
- String msgId = (String) iterator.next();
- Long recdTime = (Long) receivedMessages.get(msgId);
- if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
- toBeRemoved.add(msgId);
+ synchronized (receivedMessages) {
+ for (Iterator iter = receivedMessages.keySet().iterator(); iter.hasNext();) {
+ String msgId = (String) iter.next();
+ Long recdTime = (Long) receivedMessages.get(msgId);
+ if (System.currentTimeMillis() - recdTime.longValue() >= TIME_TO_LIVE) {
+ toBeRemoved.add(msgId);
+ }
}
- }
- for (Iterator iterator = toBeRemoved.iterator(); iterator.hasNext();) {
- String msgId = (String) iterator.next();
- receivedMessages.remove(msgId);
- if (log.isDebugEnabled()) {
- log.debug("Removed message " + msgId + " from received message buffer");
+ for (Iterator iter = toBeRemoved.iterator(); iter.hasNext();) {
+ String msgId = (String) iter.next();
+ receivedMessages.remove(msgId);
+ if (log.isDebugEnabled()) {
+ log.debug("Removed message " + msgId + " from received message buffer");
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: axis-cvs-unsubscribe@ws.apache.org
For additional commands, e-mail: axis-cvs-help@ws.apache.org