You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-commits@axis.apache.org by az...@apache.org on 2010/06/22 20:53:36 UTC
svn commit: r956974 -
/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
Author: azeez
Date: Tue Jun 22 18:53:35 2010
New Revision: 956974
URL: http://svn.apache.org/viewvc?rev=956974&view=rev
Log:
Improvements as suggested in https://issues.apache.org/jira/browse/AXIS2-4749
Modified:
axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
Modified: axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java
URL: http://svn.apache.org/viewvc/axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java?rev=956974&r1=956973&r2=956974&view=diff
==============================================================================
--- axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java (original)
+++ axis/axis2/java/core/trunk/modules/clustering/src/org/apache/axis2/clustering/tribes/AtMostOnceInterceptor.java Tue Jun 22 18:53:35 2010
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -35,8 +36,8 @@ import java.util.Map;
public class AtMostOnceInterceptor extends ChannelInterceptorBase {
private static Log log = LogFactory.getLog(AtMostOnceInterceptor.class);
- private static final Map<ChannelMessage, Long> receivedMessages =
- new HashMap<ChannelMessage, Long>();
+ private static final Map<MessageId, Long> receivedMessages =
+ new HashMap<MessageId, Long>();
/**
* The time a message lives in the receivedMessages Map
@@ -52,8 +53,9 @@ public class AtMostOnceInterceptor exten
public void messageReceived(ChannelMessage msg) {
if (okToProcess(msg.getOptions())) {
synchronized (receivedMessages) {
- if (receivedMessages.get(msg) == null) { // If it is a new message, keep track of it
- receivedMessages.put(msg, System.currentTimeMillis());
+ MessageId msgId = new MessageId(msg.getUniqueId());
+ if (receivedMessages.get(msgId) == null) { // If it is a new message, keep track of it
+ receivedMessages.put(msgId, System.currentTimeMillis());
super.messageReceived(msg);
} else { // If it is a duplicate message, discard it. i.e. dont call super.messageReceived
log.info("Duplicate message received from " + TribesUtil.getName(msg.getAddress()));
@@ -64,7 +66,7 @@ public class AtMostOnceInterceptor exten
}
}
- private class MessageCleanupTask implements Runnable {
+ private static class MessageCleanupTask implements Runnable {
public void run() {
while (true) { // This task should never terminate
@@ -74,20 +76,20 @@ public class AtMostOnceInterceptor exten
e.printStackTrace();
}
try {
- List<ChannelMessage> toBeRemoved = new ArrayList<ChannelMessage>();
+ List<MessageId> toBeRemoved = new ArrayList<MessageId>();
Thread.yield();
synchronized (receivedMessages) {
- for (ChannelMessage msg : receivedMessages.keySet()) {
- long arrivalTime = receivedMessages.get(msg);
+ for (MessageId msgId : receivedMessages.keySet()) {
+ long arrivalTime = receivedMessages.get(msgId);
if (System.currentTimeMillis() - arrivalTime >= TIMEOUT) {
- toBeRemoved.add(msg);
+ toBeRemoved.add(msgId);
if (toBeRemoved.size() > 10000) { // Do not allow this thread to run for too long
break;
}
}
}
- for (ChannelMessage msg : toBeRemoved) {
- receivedMessages.remove(msg);
+ for (MessageId msgId : toBeRemoved) {
+ receivedMessages.remove(msgId);
if (log.isDebugEnabled()) {
log.debug("Cleaned up message ");
}
@@ -99,4 +101,38 @@ public class AtMostOnceInterceptor exten
}
}
}
+
+ /**
+ * Represents a Message ID
+ */
+ private static class MessageId {
+ private byte[] id;
+
+ private MessageId(byte[] id) {
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MessageId messageId = (MessageId) o;
+
+ if (!Arrays.equals(id, messageId.id)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(id);
+ }
+ }
}