You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/10 00:09:00 UTC
svn commit: r384640 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/udp/CommandChannel.java
test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
Author: jstrachan
Date: Thu Mar 9 15:08:57 2006
New Revision: 384640
URL: http://svn.apache.org/viewcvs?rev=384640&view=rev
Log:
added test cases for UDP fragmentation
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384640&r1=384639&r2=384640&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Thu Mar 9 15:08:57 2006
@@ -207,10 +207,10 @@
boolean lastFragment = false;
for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
// write the header
- writeBuffer.rewind();
+ writeBuffer.clear();
int chunkSize = writeBuffer.capacity() - headerMarshaller.getHeaderSize(header);
lastFragment = offset + chunkSize >= length;
- if (lastFragment) {
+ if (chunkSize + offset > length) {
chunkSize = length - offset;
}
header.incrementCounter();
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=384640&r1=384639&r2=384640&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java Thu Mar 9 15:08:57 2006
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.transport.udp;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.WireFormatInfo;
@@ -25,6 +28,8 @@
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
+import javax.jms.MessageNotWriteableException;
+
import java.io.IOException;
import java.net.URI;
@@ -41,7 +46,8 @@
protected Object lock = new Object();
protected Command receivedCommand;
- private TransportServer server;
+ protected TransportServer server;
+ protected boolean large;
public void testSendingSmallMessage() throws Exception {
ConsumerInfo expected = new ConsumerInfo();
@@ -50,7 +56,7 @@
expected.setCommandId((short) 12);
expected.setExclusive(true);
expected.setPrefetchSize(3456);
-
+
try {
System.out.println("About to send: " + expected);
producer.oneway(expected);
@@ -70,6 +76,54 @@
}
}
+ public void testSendingMediumMessage() throws Exception {
+ String text = createMessageBodyText(4 * 105);
+ ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Medium");
+ assertSendTextMessage(destination, text);
+ }
+
+ public void testSendingLargeMessage() throws Exception {
+ String text = createMessageBodyText(4 * 1024);
+ ActiveMQDestination destination = new ActiveMQQueue("Foo.Bar.Large");
+ assertSendTextMessage(destination, text);
+ }
+
+ protected void assertSendTextMessage(ActiveMQDestination destination, String text) throws MessageNotWriteableException {
+ large = true;
+
+ ActiveMQTextMessage expected = new ActiveMQTextMessage();
+
+ expected.setText(text);
+ expected.setDestination(destination);
+
+ try {
+ System.out.println("About to send message of type: " + expected.getClass());
+ producer.oneway(expected);
+
+ Command received = assertCommandReceived();
+ assertTrue("Should have received a ActiveMQTextMessage but was: " + received, received instanceof ActiveMQTextMessage);
+ ActiveMQTextMessage actual = (ActiveMQTextMessage) received;
+
+ assertEquals("getDestination", expected.getDestination(), actual.getDestination());
+ assertEquals("getText", expected.getText(), actual.getText());
+
+ System.out.println("Received text message with: " + actual.getText().length() + " character(s)");
+ }
+ catch (Exception e) {
+ System.out.println("Caught: " + e);
+ e.printStackTrace();
+ fail("Failed to send to transport: " + e);
+ }
+ }
+
+ protected String createMessageBodyText(int loopSize) {
+ StringBuffer buffer = new StringBuffer();
+ for (int i = 0; i < loopSize; i++) {
+ buffer.append("0123456789");
+ }
+ return buffer.toString();
+ }
+
protected void setUp() throws Exception {
server = createServer();
if (server != null) {
@@ -133,7 +187,12 @@
System.out.println("Got WireFormatInfo: " + command);
}
else {
- System.out.println("### Received command: " + command);
+ if (large) {
+ System.out.println("### Received command: " + command.getClass() + " with id: " + command.getCommandId());
+ }
+ else {
+ System.out.println("### Received command: " + command);
+ }
synchronized (lock) {
receivedCommand = command;
@@ -157,7 +216,10 @@
protected Command assertCommandReceived() throws InterruptedException {
Command answer = null;
synchronized (lock) {
- lock.wait(1000);
+ answer = receivedCommand;
+ if (answer == null) {
+ lock.wait(10000);
+ }
answer = receivedCommand;
}