You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/23 16:21:49 UTC
svn commit: r670614 - in
/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes:
AtMostOnceInterceptor.java ChannelSender.java
Author: azeez
Date: Mon Jun 23 07:21:49 2008
New Revision: 670614
URL: http://svn.apache.org/viewvc?rev=670614&view=rev
Log:
We should store only messages that have to be processed at-most once, in the AtMostOnceInterceptor
Modified:
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=670614&r1=670613&r2=670614&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Mon Jun 23 07:21:49 2008
@@ -50,13 +50,17 @@
}
public void messageReceived(ChannelMessage msg) {
- synchronized (receivedMessages) {
- if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
- receivedMessages.put(msg, System.currentTimeMillis());
- super.messageReceived(msg);
- } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
- log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress()));
+ if (okToProcess(msg.getOptions())) {
+ synchronized (receivedMessages) {
+ if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
+ receivedMessages.put(msg, System.currentTimeMillis());
+ super.messageReceived(msg);
+ } else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+ log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress()));
+ }
}
+ } else {
+ super.messageReceived(msg);
}
}
Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=670614&r1=670613&r2=670614&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jun 23 07:21:49 2008
@@ -63,11 +63,13 @@
channel.send(members, toByteMessage(msg),
Channel.SEND_OPTIONS_USE_ACK |
Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
- TribesConstants.MSG_ORDER_OPTION);
+ TribesConstants.MSG_ORDER_OPTION |
+ TribesConstants.AT_MOST_ONCE_OPTION);
} else {
channel.send(members, toByteMessage(msg),
Channel.SEND_OPTIONS_ASYNCHRONOUS |
- TribesConstants.MSG_ORDER_OPTION);
+ TribesConstants.MSG_ORDER_OPTION |
+ TribesConstants.AT_MOST_ONCE_OPTION);
}
if (log.isDebugEnabled()) {
log.debug("Sent " + msg + " to group");
@@ -122,7 +124,10 @@
try {
if (member.isReady()) {
channel.send(new Member[]{member}, toByteMessage(cmd),
- Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+ Channel.SEND_OPTIONS_USE_ACK |
+ Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+ TribesConstants.MSG_ORDER_OPTION |
+ TribesConstants.AT_MOST_ONCE_OPTION);
if (log.isDebugEnabled()) {
log.debug("Sent " + cmd + " to " + TribesUtil.getName(member));
}