You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/13 10:52:21 UTC
svn commit: r385480 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/reliable/
test/java/org/apache/activemq/transport/
Author: jstrachan
Date: Mon Mar 13 01:52:15 2006
New Revision: 385480
URL: http://svn.apache.org/viewcvs?rev=385480&view=rev
Log:
updated the reliable transport to be able to deal with out of order messages within a certain range and discarding duplicates within a range
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java?rev=385480&r1=385479&r2=385480&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java Mon Mar 13 01:52:15 2006
@@ -28,11 +28,15 @@
private int maximumDifference = 5;
- public void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
- long count = Math.abs(actualCounter - expectedCounter);
+ public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
+ int difference = actualCounter - expectedCounter;
+ long count = Math.abs(difference);
if (count > maximumDifference) {
throw new IOException("Packets dropped on: " + transport + " count: " + count + " expected: " + expectedCounter + " but was: " + actualCounter);
}
+
+ // lets discard old commands
+ return difference > 0;
}
public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?rev=385480&r1=385479&r2=385480&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Mon Mar 13 01:52:15 2006
@@ -47,11 +47,13 @@
if (!valid) {
synchronized (commands) {
- // lets add it to the list for later on
- commands.add(command);
-
try {
- replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
+ boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
+
+ if (keep) {
+ // lets add it to the list for later on
+ commands.add(command);
+ }
}
catch (IOException e) {
getTransportListener().onException(e);
@@ -89,6 +91,23 @@
}
}
}
+ }
+
+ public int getBufferedCommandCount() {
+ synchronized (commands) {
+ return commands.size();
+ }
+ }
+
+ public int getExpectedCounter() {
+ return expectedCounter;
+ }
+
+ /**
+ * This property should never really be set - but is mutable primarily for test cases
+ */
+ public void setExpectedCounter(int expectedCounter) {
+ this.expectedCounter = expectedCounter;
}
public String toString() {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java?rev=385480&r1=385479&r2=385480&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java Mon Mar 13 01:52:15 2006
@@ -26,7 +26,15 @@
*/
public interface ReplayStrategy {
- void onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException;
+ /**
+ * Deals with a dropped packet.
+ *
+ * @param transport the transport on which the packet was dropped
+ * @param expectedCounter the expected command counter
+ * @param actualCounter the actual command counter
+ * @return true if the command should be buffered or false if it should be discarded
+ */
+ boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException;
void onReceivedPacket(ReliableTransport transport, long expectedCounter);
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java?rev=385480&r1=385479&r2=385480&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java Mon Mar 13 01:52:15 2006
@@ -31,7 +31,7 @@
*/
public class ReliableTransportTest extends TestCase {
- protected TransportFilter transport;
+ protected ReliableTransport transport;
protected StubTransportListener listener = new StubTransportListener();
protected ReplayStrategy replayStrategy;
@@ -41,12 +41,40 @@
sendStreamOfCommands(sequenceNumbers, true);
}
+ public void testValidWrapAroundPackets() throws Exception {
+ int[] sequenceNumbers = new int[10];
+
+ int value = Integer.MAX_VALUE - 3;
+ transport.setExpectedCounter(value);
+
+ for (int i = 0; i < 10; i++) {
+ System.out.println("command: " + i + " = " + value);
+ sequenceNumbers[i] = value++;
+ }
+
+ sendStreamOfCommands(sequenceNumbers, true);
+ }
+
public void testDuplicatePacketsDropped() throws Exception {
int[] sequenceNumbers = { 1, 2, 2, 3, 4, 5, 6, 7 };
sendStreamOfCommands(sequenceNumbers, true, 7);
}
+ public void testOldDuplicatePacketsDropped() throws Exception {
+ int[] sequenceNumbers = { 1, 2, 3, 4, 5, 2, 6, 7 };
+
+ sendStreamOfCommands(sequenceNumbers, true, 7);
+ }
+
+ public void testOldDuplicatePacketsDroppedUsingNegativeCounters() throws Exception {
+ int[] sequenceNumbers = { -3, -1, -3, -2, -1, 0, 1, -1, 3, 2, 0, 2, 4 };
+
+ transport.setExpectedCounter(-3);
+
+ sendStreamOfCommands(sequenceNumbers, true, 8);
+ }
+
public void testWrongOrderOfPackets() throws Exception {
int[] sequenceNumbers = { 4, 3, 1, 5, 2, 7, 6, 8, 10, 9 };
@@ -83,13 +111,14 @@
fail("Caught exception: " + e);
}
assertEquals("number of messages received", expectedCount, commands.size());
- }
+
+ assertEquals("Should have no buffered commands", 0, transport.getBufferedCommandCount());
+ }
else {
assertTrue("Should have received an exception!", exceptions.size() > 0);
Exception e = (Exception) exceptions.remove();
System.out.println("Caught expected response: " + e);
}
-
}
protected void setUp() throws Exception {