You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/10/02 19:20:36 UTC
activemq git commit: AMQ-7065 Update to Qpid JMS v0.37.0
Repository: activemq
Updated Branches:
refs/heads/master 524615128 -> ac1e709dc
AMQ-7065 Update to Qpid JMS v0.37.0
Update to latest client release, adds some tests for split framed
message send / receive
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ac1e709d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ac1e709d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ac1e709d
Branch: refs/heads/master
Commit: ac1e709dc4ed419c8d789fc29e970fde6b796ed1
Parents: 5246151
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 2 15:20:11 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 2 15:20:11 2018 -0400
----------------------------------------------------------------------
.../amqp/JMSLargeMessageSendRecvTest.java | 124 +++++++++++++++----
.../transport/amqp/client/AmqpSender.java | 3 +-
pom.xml | 2 +-
3 files changed, 104 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ac1e709d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
index 20ad2d9..7f96afc 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
+ protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
+
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@@ -55,46 +59,38 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
});
}
- public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
- super(connectorScheme, secure);
- }
-
@Rule
public TestName testName = new TestName();
- protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
-
- private String createLargeString(int sizeInBytes) {
- byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < sizeInBytes; i++) {
- builder.append(base[i % base.length]);
- }
+ public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
+ super(connectorScheme, secure);
+ }
- LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
- return builder.toString();
+ @Test(timeout = 60 * 1000)
+ public void testSendSmallerTextMessage() throws JMSException {
+ doTestSendTextMessageOfGivenSize(1024);
}
@Test(timeout = 60 * 1000)
- public void testSendSmallerMessages() throws JMSException {
+ public void testSendSeriesOfSmallerTextMessages() throws JMSException {
for (int i = 512; i <= (8 * 1024); i += 512) {
- doTestSendLargeMessage(i);
+ doTestSendTextMessageOfGivenSize(i);
}
}
@Test(timeout = 60 * 1000)
- public void testSendFixedSizedMessages() throws JMSException {
- doTestSendLargeMessage(65536);
- doTestSendLargeMessage(65536 * 2);
- doTestSendLargeMessage(65536 * 4);
+ public void testSendFixedSizedTextMessages() throws JMSException {
+ doTestSendTextMessageOfGivenSize(65536);
+ doTestSendTextMessageOfGivenSize(65536 * 2);
+ doTestSendTextMessageOfGivenSize(65536 * 4);
}
@Test(timeout = 60 * 1000)
- public void testSendHugeMessage() throws JMSException {
- doTestSendLargeMessage(1024 * 1024 * 10);
+ public void testSendHugeTextMessage() throws JMSException {
+ doTestSendTextMessageOfGivenSize(1024 * 1024 * 5);
}
- public void doTestSendLargeMessage(int expectedSize) throws JMSException{
+ public void doTestSendTextMessageOfGivenSize(int expectedSize) throws JMSException{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
String payload = createLargeString(expectedSize);
assertEquals(expectedSize, payload.getBytes().length);
@@ -126,4 +122,86 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
assertEquals(payload, receivedText);
connection.close();
}
+
+ @Test(timeout = 60 * 1000)
+ public void testSendSmallerBytesMessage() throws JMSException {
+ doTestSendBytesMessageOfGivenSize(1024);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendSeriesOfSmallerBytesMessages() throws JMSException {
+ for (int i = 512; i <= (8 * 1024); i += 512) {
+ doTestSendBytesMessageOfGivenSize(i);
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendFixedSizedBytesMessages() throws JMSException {
+ doTestSendBytesMessageOfGivenSize(65536);
+ doTestSendBytesMessageOfGivenSize(65536 * 2);
+ doTestSendBytesMessageOfGivenSize(65536 * 4);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendHugeBytesMessage() throws JMSException {
+ doTestSendBytesMessageOfGivenSize(1024 * 1024 * 5);
+ }
+
+ public void doTestSendBytesMessageOfGivenSize(int expectedSize) throws JMSException{
+ LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
+ byte[] payload = createLargeByteArray(expectedSize);
+ assertEquals(expectedSize, payload.length);
+
+ Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
+ long startTime = System.currentTimeMillis();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(testName.getMethodName());
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(payload);
+ producer.send(message);
+ long endTime = System.currentTimeMillis();
+ LOG.info("Returned from send after {} ms", endTime - startTime);
+
+ startTime = System.currentTimeMillis();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ connection.start();
+ LOG.info("Calling receive");
+ Message receivedMessage = consumer.receive();
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof BytesMessage);
+ BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
+ assertNotNull(receivedMessage);
+ endTime = System.currentTimeMillis();
+ LOG.info("Returned from receive after {} ms", endTime - startTime);
+ byte[] receivedBytes = new byte[(int) receivedBytesMessage.getBodyLength()];
+ receivedBytesMessage.readBytes(receivedBytes);
+ assertEquals(expectedSize, receivedBytes.length);
+ assertArrayEquals(payload, receivedBytes);
+ connection.close();
+ }
+
+ private String createLargeString(int sizeInBytes) {
+ byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < sizeInBytes; i++) {
+ builder.append(base[i % base.length]);
+ }
+
+ LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
+ return builder.toString();
+ }
+
+ private byte[] createLargeByteArray(int sizeInBytes) {
+ byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+
+ byte[] payload = new byte[sizeInBytes];
+ for (int i = 0; i < sizeInBytes; i++) {
+ payload[i] = (base[i % base.length]);
+ }
+
+ LOG.debug("Created byte array with size : " + payload.length + " bytes");
+ return payload;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ac1e709d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index db487bd..589a328 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
+import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
@@ -457,7 +458,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
try {
encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
break;
- } catch (java.nio.BufferOverflowException e) {
+ } catch (BufferOverflowException | IndexOutOfBoundsException e) {
encodeBuffer = new byte[encodeBuffer.length * 2];
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ac1e709d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fccb18f..ebf0d4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.29.0</qpid-proton-version>
- <qpid-jms-version>0.36.0</qpid-jms-version>
+ <qpid-jms-version>0.37.0</qpid-jms-version>
<qpid-jms-netty-version>4.1.28.Final</qpid-jms-netty-version>
<qpid-jms-proton-version>0.29.0</qpid-jms-proton-version>
<netty-all-version>4.1.28.Final</netty-all-version>