You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by az...@apache.org on 2008/06/23 16:21:49 UTC

svn commit: r670614 - in /webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes: AtMostOnceInterceptor.java ChannelSender.java

Author: azeez
Date: Mon Jun 23 07:21:49 2008
New Revision: 670614

URL: http://svn.apache.org/viewvc?rev=670614&view=rev
Log:
We should store only messages that have to be processed at-most once, in the AtMostOnceInterceptor

Modified:
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
    webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=670614&r1=670613&r2=670614&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Mon Jun 23 07:21:49 2008
@@ -50,13 +50,17 @@
     }
 
     public void messageReceived(ChannelMessage msg) {
-        synchronized (receivedMessages) {
-            if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
-                receivedMessages.put(msg, System.currentTimeMillis());
-                super.messageReceived(msg);
-            } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
-                log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress()));
+        if (okToProcess(msg.getOptions())) {
+            synchronized (receivedMessages) {
+                if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
+                    receivedMessages.put(msg, System.currentTimeMillis());
+                    super.messageReceived(msg);
+                } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
+                    log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress()));
+                }
             }
+        } else {
+            super.messageReceived(msg);
         }
     }
 

Modified: webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java
URL: http://svn.apache.org/viewvc/webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java?rev=670614&r1=670613&r2=670614&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java (original)
+++ webservices/axis2/trunk/java/modules/clustering/src/org/apache/axis2/clustering/tribes/ChannelSender.java Mon Jun 23 07:21:49 2008
@@ -63,11 +63,13 @@
                     channel.send(members, toByteMessage(msg),
                                  Channel.SEND_OPTIONS_USE_ACK |
                                  Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
-                                 TribesConstants.MSG_ORDER_OPTION);
+                                 TribesConstants.MSG_ORDER_OPTION |
+                                 TribesConstants.AT_MOST_ONCE_OPTION);
                 } else {
                     channel.send(members, toByteMessage(msg),
                                  Channel.SEND_OPTIONS_ASYNCHRONOUS |
-                                 TribesConstants.MSG_ORDER_OPTION);
+                                 TribesConstants.MSG_ORDER_OPTION |
+                                 TribesConstants.AT_MOST_ONCE_OPTION);
                 }
                 if (log.isDebugEnabled()) {
                     log.debug("Sent " + msg + " to group");
@@ -122,7 +124,10 @@
         try {
             if (member.isReady()) {
                 channel.send(new Member[]{member}, toByteMessage(cmd),
-                             Channel.SEND_OPTIONS_USE_ACK | Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
+                             Channel.SEND_OPTIONS_USE_ACK |
+                             Channel.SEND_OPTIONS_SYNCHRONIZED_ACK |
+                             TribesConstants.MSG_ORDER_OPTION |
+                             TribesConstants.AT_MOST_ONCE_OPTION);
                 if (log.isDebugEnabled()) {
                     log.debug("Sent " + cmd + " to " + TribesUtil.getName(member));
                 }