You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/10/02 19:20:36 UTC

activemq git commit: AMQ-7065 Update to Qpid JMS v0.37.0

Repository: activemq
Updated Branches:
  refs/heads/master 524615128 -> ac1e709dc


AMQ-7065 Update to Qpid JMS v0.37.0

Update to latest client release, adds some tests for split framed
message send / receive

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ac1e709d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ac1e709d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ac1e709d

Branch: refs/heads/master
Commit: ac1e709dc4ed419c8d789fc29e970fde6b796ed1
Parents: 5246151
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 2 15:20:11 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 2 15:20:11 2018 -0400

----------------------------------------------------------------------
 .../amqp/JMSLargeMessageSendRecvTest.java       | 124 +++++++++++++++----
 .../transport/amqp/client/AmqpSender.java       |   3 +-
 pom.xml                                         |   2 +-
 3 files changed, 104 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ac1e709d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
index 20ad2d9..7f96afc 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSLargeMessageSendRecvTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.Arrays;
 import java.util.Collection;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
 @RunWith(Parameterized.class)
 public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
 
+    protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
+
     @Parameters(name="{0}")
     public static Collection<Object[]> data() {
         return Arrays.asList(new Object[][] {
@@ -55,46 +59,38 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
         });
     }
 
-    public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
-        super(connectorScheme, secure);
-    }
-
     @Rule
     public TestName testName = new TestName();
 
-    protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
-
-    private String createLargeString(int sizeInBytes) {
-        byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
-        StringBuilder builder = new StringBuilder();
-        for (int i = 0; i < sizeInBytes; i++) {
-            builder.append(base[i % base.length]);
-        }
+    public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
+        super(connectorScheme, secure);
+    }
 
-        LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
-        return builder.toString();
+    @Test(timeout = 60 * 1000)
+    public void testSendSmallerTextMessage() throws JMSException {
+        doTestSendTextMessageOfGivenSize(1024);
     }
 
     @Test(timeout = 60 * 1000)
-    public void testSendSmallerMessages() throws JMSException {
+    public void testSendSeriesOfSmallerTextMessages() throws JMSException {
         for (int i = 512; i <= (8 * 1024); i += 512) {
-            doTestSendLargeMessage(i);
+            doTestSendTextMessageOfGivenSize(i);
         }
     }
 
     @Test(timeout = 60 * 1000)
-    public void testSendFixedSizedMessages() throws JMSException {
-        doTestSendLargeMessage(65536);
-        doTestSendLargeMessage(65536 * 2);
-        doTestSendLargeMessage(65536 * 4);
+    public void testSendFixedSizedTextMessages() throws JMSException {
+        doTestSendTextMessageOfGivenSize(65536);
+        doTestSendTextMessageOfGivenSize(65536 * 2);
+        doTestSendTextMessageOfGivenSize(65536 * 4);
     }
 
     @Test(timeout = 60 * 1000)
-    public void testSendHugeMessage() throws JMSException {
-        doTestSendLargeMessage(1024 * 1024 * 10);
+    public void testSendHugeTextMessage() throws JMSException {
+        doTestSendTextMessageOfGivenSize(1024 * 1024 * 5);
     }
 
-    public void doTestSendLargeMessage(int expectedSize) throws JMSException{
+    public void doTestSendTextMessageOfGivenSize(int expectedSize) throws JMSException{
         LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
         String payload = createLargeString(expectedSize);
         assertEquals(expectedSize, payload.getBytes().length);
@@ -126,4 +122,86 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
         assertEquals(payload, receivedText);
         connection.close();
     }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendSmallerBytesMessage() throws JMSException {
+        doTestSendBytesMessageOfGivenSize(1024);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendSeriesOfSmallerBytesMessages() throws JMSException {
+        for (int i = 512; i <= (8 * 1024); i += 512) {
+            doTestSendBytesMessageOfGivenSize(i);
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendFixedSizedBytesMessages() throws JMSException {
+        doTestSendBytesMessageOfGivenSize(65536);
+        doTestSendBytesMessageOfGivenSize(65536 * 2);
+        doTestSendBytesMessageOfGivenSize(65536 * 4);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendHugeBytesMessage() throws JMSException {
+        doTestSendBytesMessageOfGivenSize(1024 * 1024 * 5);
+    }
+
+    public void doTestSendBytesMessageOfGivenSize(int expectedSize) throws JMSException{
+        LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
+        byte[] payload = createLargeByteArray(expectedSize);
+        assertEquals(expectedSize, payload.length);
+
+        Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
+        long startTime = System.currentTimeMillis();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(testName.getMethodName());
+        MessageProducer producer = session.createProducer(queue);
+        BytesMessage message = session.createBytesMessage();
+        message.writeBytes(payload);
+        producer.send(message);
+        long endTime = System.currentTimeMillis();
+        LOG.info("Returned from send after {} ms", endTime - startTime);
+
+        startTime = System.currentTimeMillis();
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        connection.start();
+        LOG.info("Calling receive");
+        Message receivedMessage = consumer.receive();
+        assertNotNull(receivedMessage);
+        assertTrue(receivedMessage instanceof BytesMessage);
+        BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
+        assertNotNull(receivedMessage);
+        endTime = System.currentTimeMillis();
+        LOG.info("Returned from receive after {} ms", endTime - startTime);
+        byte[] receivedBytes = new byte[(int) receivedBytesMessage.getBodyLength()];
+        receivedBytesMessage.readBytes(receivedBytes);
+        assertEquals(expectedSize, receivedBytes.length);
+        assertArrayEquals(payload, receivedBytes);
+        connection.close();
+    }
+
+    private String createLargeString(int sizeInBytes) {
+        byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < sizeInBytes; i++) {
+            builder.append(base[i % base.length]);
+        }
+
+        LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
+        return builder.toString();
+    }
+
+    private byte[] createLargeByteArray(int sizeInBytes) {
+        byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
+
+        byte[] payload = new byte[sizeInBytes];
+        for (int i = 0; i < sizeInBytes; i++) {
+            payload[i] = (base[i % base.length]);
+        }
+
+        LOG.debug("Created byte array with size : " + payload.length + " bytes");
+        return payload;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac1e709d/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index db487bd..589a328 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.amqp.client;
 
 import java.io.IOException;
+import java.nio.BufferOverflowException;
 import java.util.ArrayList;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -457,7 +458,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
             try {
                 encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
                 break;
-            } catch (java.nio.BufferOverflowException e) {
+            } catch (BufferOverflowException | IndexOutOfBoundsException e) {
                 encodeBuffer = new byte[encodeBuffer.length * 2];
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ac1e709d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fccb18f..ebf0d4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@
     <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
     <zookeeper-version>3.4.6</zookeeper-version>
     <qpid-proton-version>0.29.0</qpid-proton-version>
-    <qpid-jms-version>0.36.0</qpid-jms-version>
+    <qpid-jms-version>0.37.0</qpid-jms-version>
     <qpid-jms-netty-version>4.1.28.Final</qpid-jms-netty-version>
     <qpid-jms-proton-version>0.29.0</qpid-jms-proton-version>
     <netty-all-version>4.1.28.Final</netty-all-version>