You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/13 10:17:48 UTC

svn commit: r385478 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/reliable/ main/java/org/apache/activemq/transport/replay/ main/java/org/apache/activemq/transport/udp...

Author: jstrachan
Date: Mon Mar 13 01:17:43 2006
New Revision: 385478

URL: http://svn.apache.org/viewcvs?rev=385478&view=rev
Log:
refactored the reliable transport into the reliable package and added a test case demonstrating the transport handling duplicate packets and dealing with reordering OK

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java   (contents, props changed)
      - copied, changed from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java   (contents, props changed)
      - copied, changed from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java   (contents, props changed)
      - copied, changed from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java (from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java&r1=385456&r2=385478&rev=385478&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ExceptionIfDroppedReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java Mon Mar 13 01:17:43 2006
@@ -14,25 +14,40 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.replay;
-
-import org.apache.activemq.transport.ReliableTransport;
+package org.apache.activemq.transport.reliable;
 
 import java.io.IOException;
 
 /**
- * Throws an exception if packets are dropped causing the transport to be closed.
+ * Throws an exception if packets are dropped causing the transport to be
+ * closed.
  * 
  * @version $Revision$
  */
 public class ExceptionIfDroppedReplayStrategy implements ReplayStrategy {
 
-    public void onDroppedPackets(ReliableTransport transport, long expectedCounter, long actualCounter) throws IOException {
-        long count = actualCounter - expectedCounter;
-        throw new IOException("Packets dropped on: " + transport + " count: " + count +  " expected: " + expectedCounter + " but was: " + actualCounter);
+    private int maximumDifference = 5;
+
+    public void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
+        long count = Math.abs(actualCounter - expectedCounter);
+        if (count > maximumDifference) {
+            throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
+        }
     }
 
     public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {
+    }
+
+    public int getMaximumDifference() {
+        return maximumDifference;
+    }
+
+    /**
+     * Sets the maximum allowed difference between an expected packet and an
+     * actual packet before an error occurs
+     */
+    public void setMaximumDifference(int maximumDifference) {
+        this.maximumDifference = maximumDifference;
     }
 
 }

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java&r1=385456&r2=385478&rev=385478&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/ReliableTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Mon Mar 13 01:17:43 2006
@@ -14,13 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport;
+package org.apache.activemq.transport.reliable;
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.openwire.CommandIdComparator;
-import org.apache.activemq.transport.replay.ReplayStrategy;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
 
 import java.io.IOException;
 import java.util.SortedSet;
@@ -33,10 +32,8 @@
  * @version $Revision$
  */
 public class ReliableTransport extends TransportFilter {
-    private static final Log log = LogFactory.getLog(ReliableTransport.class);
-
     private ReplayStrategy replayStrategy;
-    private SortedSet headers = new TreeSet(new CommandIdComparator());
+    private SortedSet commands = new TreeSet(new CommandIdComparator());
     private int expectedCounter = 1;
 
     public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
@@ -49,28 +46,48 @@
         boolean valid = expectedCounter == actualCounter;
 
         if (!valid) {
-            // lets add it to the list for later on
-            headers.add(command);
-
-            try {
-                replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
-            }
-            catch (IOException e) {
-                getTransportListener().onException(e);
-            }
-
-            if (!headers.isEmpty()) {
-                // lets see if the first item in the set is the next header
-                command = (Command) headers.first();
-                valid = expectedCounter == command.getCommandId();
+            synchronized (commands) {
+                // lets add it to the list for later on
+                commands.add(command);
+
+                try {
+                    replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
+                }
+                catch (IOException e) {
+                    getTransportListener().onException(e);
+                }
+
+                if (!commands.isEmpty()) {
+                    // lets see if the first item in the set is the next
+                    // expected
+                    command = (Command) commands.first();
+                    valid = expectedCounter == command.getCommandId();
+                    if (valid) {
+                        commands.remove(command);
+                    }
+                }
             }
         }
 
-        if (valid) {
+        while (valid) {
             // we've got a valid header so increment counter
             replayStrategy.onReceivedPacket(this, expectedCounter);
             expectedCounter++;
             getTransportListener().onCommand(command);
+
+            synchronized (commands) {
+                // we could have more commands left
+                valid = !commands.isEmpty();
+                if (valid) {
+                    // lets see if the first item in the set is the next
+                    // expected
+                    command = (Command) commands.first();
+                    valid = expectedCounter == command.getCommandId();
+                    if (valid) {
+                        commands.remove(command);
+                    }
+                }
+            }
         }
     }
 

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Copied: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java (from r385456, incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java)
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java?p2=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java&p1=incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java&r1=385456&r2=385478&rev=385478&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/replay/ReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java Mon Mar 13 01:17:43 2006
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.activemq.transport.replay;
+package org.apache.activemq.transport.reliable;
 
-import org.apache.activemq.transport.ReliableTransport;
 
 import java.io.IOException;
 
@@ -27,7 +26,7 @@
  */
 public interface ReplayStrategy {
 
-    void onDroppedPackets(ReliableTransport transport, long expectedCounter, long actualCounter) throws IOException;
+    void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException;
 
     void onReceivedPacket(ReliableTransport transport, long expectedCounter);
 

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=385478&r1=385477&r2=385478&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Mon Mar 13 01:17:43 2006
@@ -22,8 +22,8 @@
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy;
-import org.apache.activemq.transport.replay.ReplayStrategy;
+import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
+import org.apache.activemq.transport.reliable.ReplayStrategy;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java?rev=385478&r1=385477&r2=385478&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java Mon Mar 13 01:17:43 2006
@@ -19,8 +19,9 @@
 import edu.emory.mathcs.backport.java.util.Queue;
 
 import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy;
-import org.apache.activemq.transport.replay.ReplayStrategy;
+import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
+import org.apache.activemq.transport.reliable.ReliableTransport;
+import org.apache.activemq.transport.reliable.ReplayStrategy;
 
 import junit.framework.TestCase;
 
@@ -36,17 +37,33 @@
 
     public void testValidSequenceOfPackets() throws Exception {
         int[] sequenceNumbers = { 1, 2, 3, 4, 5, 6, 7 };
+        
+        sendStreamOfCommands(sequenceNumbers, true);
+    }
+    
+    public void testDuplicatePacketsDropped() throws Exception {
+        int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 };
+        
+        sendStreamOfCommands(sequenceNumbers, true, 7);
+    }
+    
+    public void testWrongOrderOfPackets() throws Exception {
+        int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 };
 
         sendStreamOfCommands(sequenceNumbers, true);
     }
 
-    public void testInvalidSequenceOfPackets() throws Exception {
-        int[] sequenceNumbers = { 1, 2, /* 3, */  4, 5, 6, 7 };
+    public void testMissingPacketsFails() throws Exception {
+        int[] sequenceNumbers = { 1, 2, /* 3, */  4, 5, 6, 7, 8, 9, 10 };
 
         sendStreamOfCommands(sequenceNumbers, false);
     }
 
     protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected) {
+        sendStreamOfCommands(sequenceNumbers, expected, sequenceNumbers.length);
+    }
+    
+    protected void sendStreamOfCommands(int[] sequenceNumbers, boolean expected, int expectedCount) {
         for (int i = 0; i < sequenceNumbers.length; i++) {
             int commandId = sequenceNumbers[i];
             
@@ -65,7 +82,7 @@
                 e.printStackTrace();
                 fail("Caught exception: " + e);
             }
-            assertEquals("number of messages received", sequenceNumbers.length, commands.size());
+            assertEquals("number of messages received", expectedCount, commands.size());
         }
         else {
             assertTrue("Should have received an exception!", exceptions.size() > 0);