You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2010/06/22 20:53:36 UTC

svn commit: r956974 - /axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java

Author: azeez
Date: Tue Jun 22 18:53:35 2010
New Revision: 956974

URL: http://svn.apache.org/viewvc?rev=956974&view=rev
Log:
Improvements as suggested in https://issues.apache.org/jira/browse/AXIS2-4749

Modified:
    axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java

Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=956974&r1=956973&r2=956974&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Tue Jun 22 18:53:35 2010
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,8 +36,8 @@ import java.util.Map;
 public class AtMostOnceInterceptor extends ChannelInterceptorBase {          
 
     private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
-    private static final Map<ChannelMessage, Long> receivedMessages =
-            new HashMap<ChannelMessage, Long>();
+    private static final Map<MessageId, Long> receivedMessages =
+            new HashMap<MessageId, Long>();
 
     /**
      * The time a message lives in the receivedMessages Map
@@ -52,8 +53,9 @@ public class AtMostOnceInterceptor exten
     public void messageReceived(ChannelMessage msg) {
         if (okToProcess(msg.getOptions())) {
             synchronized (receivedMessages) {
-                if (receivedMessages.get(msg) == null) {  // If it is a new message, keep track of it
-                    receivedMessages.put(msg, System.currentTimeMillis());
+                MessageId msgId = new MessageId(msg.getUniqueId());
+                if (receivedMessages.get(msgId) == null) {  // If it is a new message, keep track of it
+                    receivedMessages.put(msgId, System.currentTimeMillis());
                     super.messageReceived(msg);
                 } else {  // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
                     log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress()));
@@ -64,7 +66,7 @@ public class AtMostOnceInterceptor exten
         }
     }
 
-    private class MessageCleanupTask implements Runnable {
+    private static class MessageCleanupTask implements Runnable {
 
         public void run() {
             while (true) { // This task should never terminate
@@ -74,20 +76,20 @@ public class AtMostOnceInterceptor exten
                     e.printStackTrace();
                 }
                 try {
-                    List<ChannelMessage> toBeRemoved = new ArrayList<ChannelMessage>();
+                    List<MessageId> toBeRemoved = new ArrayList<MessageId>();
                     Thread.yield();
                     synchronized (receivedMessages) {
-                        for (ChannelMessage msg : receivedMessages.keySet()) {
-                            long arrivalTime = receivedMessages.get(msg);
+                        for (MessageId msgId : receivedMessages.keySet()) {
+                            long arrivalTime = receivedMessages.get(msgId);
                             if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
-                                toBeRemoved.add(msg);
+                                toBeRemoved.add(msgId);
                                 if (toBeRemoved.size() > 10000) { // Do not allow this thread to run for too long
                                     break;
                                 }
                             }
                         }
-                        for (ChannelMessage msg : toBeRemoved) {
-                            receivedMessages.remove(msg);
+                        for (MessageId msgId : toBeRemoved) {
+                            receivedMessages.remove(msgId);
                             if (log.isDebugEnabled()) {
                                 log.debug("Cleaned up message ");
                             }
@@ -99,4 +101,38 @@ public class AtMostOnceInterceptor exten
             }
         }
     }
+
+    /**
+     * Represents a Message ID
+     */
+    private static class MessageId {
+        private byte[] id;
+
+        private MessageId(byte[] id) {
+            this.id = id;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            MessageId messageId = (MessageId) o;
+
+            if (!Arrays.equals(id, messageId.id)) {
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(id);
+        }
+    }
 }