You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2013/12/04 16:34:06 UTC

git commit: Fix transmission id in coap retry filter

Updated Branches:
  refs/heads/trunk b0a9ee431 -> cf49c4706


Fix transmission id in coap retry filter


Project: http://git-wip-us.apache.org/repos/asf/mina/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/cf49c470
Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/cf49c470
Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/cf49c470

Branch: refs/heads/trunk
Commit: cf49c47060e2f736ef6227573ded2b9740823a4b
Parents: b0a9ee4
Author: Manuel Sangoi <ms...@sierrawireless.com>
Authored: Wed Dec 4 16:18:08 2013 +0100
Committer: jvermillard <jv...@apache.org>
Committed: Wed Dec 4 16:30:31 2013 +0100

----------------------------------------------------------------------
 .../apache/mina/coap/retry/CoapRetryFilter.java | 29 ++++++++++----------
 .../mina/coap/retry/CoapTransmission.java       | 20 +++++++++++++-
 .../mina/coap/retry/CoapRetryFilterTest.java    |  5 ++--
 .../mina/coap/retry/CoapTransmissionTest.java   | 13 ++++++---
 4 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina/blob/cf49c470/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
----------------------------------------------------------------------
diff --git a/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java b/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
index 374434d..83084ad 100644
--- a/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
+++ b/coap/src/main/java/org/apache/mina/coap/retry/CoapRetryFilter.java
@@ -57,10 +57,10 @@ public class CoapRetryFilter extends AbstractIoFilter {
     private ScheduledExecutorService retryExecutor = Executors.newSingleThreadScheduledExecutor();
 
     /** The confirmable messages waiting to be acknowledged */
-    private Map<Integer, CoapTransmission> inFlight = new ConcurrentHashMap<>();
+    private Map<String, CoapTransmission> inFlight = new ConcurrentHashMap<>();
 
     /** The list of processed messages used to handle duplicate copies of Confirmable messages */
-    private ExpiringMap<Integer, CoapMessage> processed = new ExpiringMap<Integer, CoapMessage>(retryExecutor);
+    private ExpiringMap<String, CoapMessage> processed = new ExpiringMap<String, CoapMessage>(retryExecutor);
 
     /**
      * {@inheritDoc}
@@ -70,6 +70,7 @@ public class CoapRetryFilter extends AbstractIoFilter {
         LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session);
 
         CoapMessage coapMsg = (CoapMessage) in;
+        String transmissionId = CoapTransmission.uniqueId(session, coapMsg);
 
         switch (coapMsg.getType()) {
         case NON_CONFIRMABLE:
@@ -78,10 +79,10 @@ public class CoapRetryFilter extends AbstractIoFilter {
             break;
         case CONFIRMABLE:
             // check if this is a duplicate of a message already processed
-            CoapMessage ack = processed.get(coapMsg.requestId());
+            CoapMessage ack = processed.get(transmissionId);
             if (ack != null) {
                 // stop the filter chain and send again the ack since it was probably lost
-                LOGGER.debug("Duplicated messages detected for ID {}", coapMsg.requestId());
+                LOGGER.debug("Duplicated messages detected with ID {} in session {}", coapMsg.requestId(), session);
                 controller.callWriteMessageForRead(ack);
             } else {
                 controller.callReadNextFilter(coapMsg);
@@ -90,11 +91,11 @@ public class CoapRetryFilter extends AbstractIoFilter {
             break;
         case ACK:
         case RESET:
-            CoapTransmission t = inFlight.get(coapMsg.requestId());
+            CoapTransmission t = inFlight.get(transmissionId);
             if (t != null) {
                 // cancel the scheduled retransmission
                 t.getRetryFuture().cancel(false);
-                inFlight.remove(coapMsg.requestId());
+                inFlight.remove(transmissionId);
             }
             controller.callReadNextFilter(coapMsg);
             break;
@@ -110,7 +111,7 @@ public class CoapRetryFilter extends AbstractIoFilter {
         LOGGER.debug("Processing a MESSAGE_WRITING for session {}", session);
 
         final CoapMessage coapMsg = (CoapMessage) message.getMessage();
-        final Integer coapMsgId = (Integer) coapMsg.requestId();
+        final String transmissionId = CoapTransmission.uniqueId(session, coapMsg);
 
         switch (coapMsg.getType()) {
 
@@ -120,17 +121,17 @@ public class CoapRetryFilter extends AbstractIoFilter {
         case RESET:
         case ACK:
             // let's keep track of the message to avoid processing it again in case of duplicate copy.
-            processed.put(coapMsgId, coapMsg);
+            processed.put(transmissionId, coapMsg);
 
             controller.callWriteNextFilter(message);
             break;
 
         case CONFIRMABLE:
             // initialize a transmission if this is not a retry
-            CoapTransmission t = inFlight.get(coapMsgId);
+            CoapTransmission t = inFlight.get(transmissionId);
             if (t == null) {
-                t = new CoapTransmission(coapMsg);
-                inFlight.put(coapMsgId, t);
+                t = new CoapTransmission(session, coapMsg);
+                inFlight.put(t.getId(), t);
             }
 
             // schedule a retry
@@ -138,15 +139,15 @@ public class CoapRetryFilter extends AbstractIoFilter {
 
                 @Override
                 public void run() {
-                    CoapTransmission t = inFlight.get(coapMsgId);
+                    CoapTransmission t = inFlight.get(transmissionId);
 
                     // send again the message if the maximum number of attempts is not reached
                     if (t != null && t.timeout()) {
-                        LOGGER.debug("Retry for message with ID {}", coapMsgId);
+                        LOGGER.debug("Retry for message with ID {}", coapMsg.requestId());
                         session.write(coapMsg);
                     } else {
                         // abort transmission
-                        LOGGER.debug("No more retry for message with ID {}", coapMsgId);
+                        LOGGER.debug("No more retry for message with ID {}", coapMsg.requestId());
                     }
                 }
             }, t.getNextTimeout(), TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/mina/blob/cf49c470/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
----------------------------------------------------------------------
diff --git a/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java b/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
index 6f1359c..5451625 100644
--- a/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
+++ b/coap/src/main/java/org/apache/mina/coap/retry/CoapTransmission.java
@@ -22,6 +22,7 @@ package org.apache.mina.coap.retry;
 import java.util.Random;
 import java.util.concurrent.ScheduledFuture;
 
+import org.apache.mina.api.IoSession;
 import org.apache.mina.coap.CoapMessage;
 
 /**
@@ -44,6 +45,11 @@ public class CoapTransmission {
     private static final int MAX_RETRANSMIT = 4;
 
     /**
+     * The unique transmission identifier
+     */
+    private String id;
+
+    /**
      * The CoAP message waiting to be acknowledged
      */
     private CoapMessage message;
@@ -64,7 +70,8 @@ public class CoapTransmission {
      */
     private long nextTimeout;
 
-    public CoapTransmission(CoapMessage message) {
+    public CoapTransmission(IoSession session, CoapMessage message) {
+        this.id = uniqueId(session, message);
         this.message = message;
 
         this.transmissionCount = 0;
@@ -88,6 +95,10 @@ public class CoapTransmission {
         return false;
     }
 
+    public String getId() {
+        return id;
+    }
+
     public CoapMessage getMessage() {
         return message;
     }
@@ -104,4 +115,11 @@ public class CoapTransmission {
         return nextTimeout;
     }
 
+    /**
+     * @return the unique identifier for a given message in a session.
+     */
+    public static String uniqueId(IoSession session, CoapMessage message) {
+        return session.getId() + "#" + message.requestId();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/mina/blob/cf49c470/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java
----------------------------------------------------------------------
diff --git a/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java b/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java
index 34c0e36..6c046ec 100644
--- a/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java
+++ b/coap/src/test/java/org/apache/mina/coap/retry/CoapRetryFilterTest.java
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.coap.retry;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 import org.apache.mina.api.IoSession;
@@ -130,7 +131,7 @@ public class CoapRetryFilterTest {
         verify(writeController).callWriteNextFilter(writeRequest);
 
         // no retry
-        Mockito.verifyZeroInteractions(session);
+        verify(session, Mockito.never()).write(any(CoapMessage.class));
     }
 
     @Test
@@ -152,7 +153,7 @@ public class CoapRetryFilterTest {
         verify(writeController).callWriteNextFilter(writeRequest);
 
         // no retry
-        Mockito.verifyZeroInteractions(session);
+        verify(session, Mockito.never()).write(any(CoapMessage.class));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/mina/blob/cf49c470/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java
----------------------------------------------------------------------
diff --git a/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java b/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java
index 1280885..c31cc48 100644
--- a/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java
+++ b/coap/src/test/java/org/apache/mina/coap/retry/CoapTransmissionTest.java
@@ -19,12 +19,13 @@
  */
 package org.apache.mina.coap.retry;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
+import org.apache.mina.api.IoSession;
 import org.apache.mina.coap.CoapMessage;
 import org.apache.mina.coap.MessageType;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Unit tests for {@link CoapTransmission}
@@ -36,8 +37,12 @@ public class CoapTransmissionTest {
 
     @Test
     public void timeout() {
-        CoapTransmission transmission = new CoapTransmission(new CoapMessage(1, MessageType.CONFIRMABLE, 1, 1234,
-                "token".getBytes(), null, "payload".getBytes()));
+
+        IoSession session = Mockito.mock(IoSession.class);
+        Mockito.when(session.getId()).thenReturn(1L);
+
+        CoapTransmission transmission = new CoapTransmission(session, new CoapMessage(1, MessageType.CONFIRMABLE, 1,
+                1234, "token".getBytes(), null, "payload".getBytes()));
 
         assertTrue(transmission.getNextTimeout() > MIN_INIT_TIMEOUT);
         assertTrue(transmission.getNextTimeout() < MAX_INIT_TIMEOUT);