You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/10 00:09:00 UTC

svn commit: r384640 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/udp/CommandChannel.java test/java/org/apache/activemq/transport/udp/UdpTestSupport.java

Author: jstrachan
Date: Thu Mar  9 15:08:57 2006
New Revision: 384640

URL: http://svn.apache.org/viewcvs?rev=384640&view=rev
Log:
added test cases for UDP fragmentation

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384640&r1=384639&r2=384640&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Thu Mar  9 15:08:57 2006
@@ -207,10 +207,10 @@
                 boolean lastFragment = false;
                 for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
                     // write the header
-                    writeBuffer.rewind();
+                    writeBuffer.clear();
                     int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
                     lastFragment = offset + chunkSize >= length;
-                    if (lastFragment) {
+                    if (chunkSize + offset > length) {
                         chunkSize = length - offset;
                     }
                     header.incrementCounter();

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=384640&r1=384639&r2=384640&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java Thu Mar  9 15:08:57 2006
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.udp;
 
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.WireFormatInfo;
@@ -25,6 +28,8 @@
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.TransportServer;
 
+import javax.jms.MessageNotWriteableException;
+
 import java.io.IOException;
 import java.net.URI;
 
@@ -41,7 +46,8 @@
 
     protected Object lock = new Object();
     protected Command receivedCommand;
-    private TransportServer server;
+    protected TransportServer server;
+    protected boolean large;
 
     public void testSendingSmallMessage() throws Exception {
         ConsumerInfo expected = new ConsumerInfo();
@@ -50,7 +56,7 @@
         expected.setCommandId((short) 12);
         expected.setExclusive(true);
         expected.setPrefetchSize(3456);
-        
+
         try {
             System.out.println("About to send: " + expected);
             producer.oneway(expected);
@@ -70,6 +76,54 @@
         }
     }
 
+    public void testSendingMediumMessage() throws Exception {
+        String text = createMessageBodyText(4 * 105);
+        ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Medium");
+        assertSendTextMessage(destination, text);
+    }
+
+    public void testSendingLargeMessage() throws Exception {
+        String text = createMessageBodyText(4 * 1024);
+        ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Large");
+        assertSendTextMessage(destination, text);
+    }
+
+    protected void assertSendTextMessage(ActiveMQDestination destination, String text) throws MessageNotWriteableException {
+        large = true;
+
+        ActiveMQTextMessage expected = new ActiveMQTextMessage();
+
+        expected.setText(text);
+        expected.setDestination(destination);
+
+        try {
+            System.out.println("About to send message of type: " + expected.getClass());
+            producer.oneway(expected);
+
+            Command received = assertCommandReceived();
+            assertTrue("Should have received a ActiveMQTextMessage but was: " + received, received instanceof ActiveMQTextMessage);
+            ActiveMQTextMessage actual = (ActiveMQTextMessage) received;
+
+            assertEquals("getDestination", expected.getDestination(), actual.getDestination());
+            assertEquals("getText", expected.getText(), actual.getText());
+            
+            System.out.println("Received text message with: " + actual.getText().length() + " character(s)");
+        }
+        catch (Exception e) {
+            System.out.println("Caught: " + e);
+            e.printStackTrace();
+            fail("Failed to send to transport: " + e);
+        }
+    }
+
+    protected String createMessageBodyText(int loopSize) {
+        StringBuffer buffer = new StringBuffer();
+        for (int i = 0; i < loopSize; i++) {
+            buffer.append("0123456789");
+        }
+        return buffer.toString();
+    }
+
     protected void setUp() throws Exception {
         server = createServer();
         if (server != null) {
@@ -133,7 +187,12 @@
             System.out.println("Got WireFormatInfo: " + command);
         }
         else {
-            System.out.println("### Received command: " + command);
+            if (large) {
+                System.out.println("### Received command: " + command.getClass() + " with id: " + command.getCommandId());
+            }
+            else {
+                System.out.println("### Received command: " + command);
+            }
 
             synchronized (lock) {
                 receivedCommand = command;
@@ -157,7 +216,10 @@
     protected Command assertCommandReceived() throws InterruptedException {
         Command answer = null;
         synchronized (lock) {
-            lock.wait(1000);
+            answer = receivedCommand;
+            if (answer == null) {
+                lock.wait(10000);
+            }
             answer = receivedCommand;
         }