You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/01/05 16:54:05 UTC
[1/2] activemq-artemis git commit: This closes #293
Repository: activemq-artemis
Updated Branches:
refs/heads/master 2838128c1 -> 94fb2c7b5
This closes #293
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/94fb2c7b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/94fb2c7b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/94fb2c7b
Branch: refs/heads/master
Commit: 94fb2c7b50cd59489cb1757ec0b0734041c62f92
Parents: 2838128 978f8ee
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jan 5 10:44:54 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jan 5 10:44:54 2016 -0500
----------------------------------------------------------------------
.../core/server/impl/ServerConsumerImpl.java | 9 +-
.../integration/client/LargeMessageTest.java | 150 +++++++++++++++++++
2 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-331 support 0-length large
msg
Posted by cl...@apache.org.
ARTEMIS-331 support 0-length large msg
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/978f8eed
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/978f8eed
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/978f8eed
Branch: refs/heads/master
Commit: 978f8eeda8c1a597402363e62c1f26e2a5cce9b7
Parents: 2838128
Author: jbertram <jb...@apache.org>
Authored: Mon Jan 4 14:44:09 2016 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jan 5 10:44:54 2016 -0500
----------------------------------------------------------------------
.../core/server/impl/ServerConsumerImpl.java | 9 +-
.../integration/client/LargeMessageTest.java | 150 +++++++++++++++++++
2 files changed, 158 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/978f8eed/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 7d54d31..7936c76 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -1036,7 +1036,14 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
context.encode(bodyBuffer, localChunkLen);
- byte[] body = bodyBuffer.toByteBuffer().array();
+ byte[] body;
+
+ if (bodyBuffer.toByteBuffer().hasArray()) {
+ body = bodyBuffer.toByteBuffer().array();
+ }
+ else {
+ body = new byte[0];
+ }
int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/978f8eed/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index d99ba63..bc94ab3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2104,6 +2104,156 @@ public class LargeMessageTest extends LargeMessageTestBase {
}
}
+ // https://issues.apache.org/jira/browse/ARTEMIS-331
+ @Test
+ public void testSendStreamingSingleEmptyMessage() throws Exception {
+ final String propertyName = "myStringPropertyName";
+ final String propertyValue = "myStringPropertyValue";
+ ClientSession session = null;
+ ActiveMQServer server = null;
+
+ final int SIZE = 0;
+ try {
+
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ locator.setMinLargeMessageSize(100 * 1024);
+
+ ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientMessage clientFile = session.createMessage(true);
+ clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
+ clientFile.putStringProperty(propertyName, propertyValue);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ session.start();
+
+ log.debug("Sending");
+ producer.send(clientFile);
+
+ producer.close();
+
+ log.debug("Waiting");
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ ClientMessage msg2 = consumer.receive(10000);
+
+ msg2.acknowledge();
+
+ msg2.setOutputStream(createFakeOutputStream());
+ Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
+ Assert.assertEquals(propertyValue, msg2.getStringProperty(propertyName));
+
+ session.commit();
+
+ Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
+
+ }
+ finally {
+ try {
+ session.close();
+ }
+ catch (Throwable ignored) {
+ }
+
+ try {
+ server.stop();
+ }
+ catch (Throwable ignored) {
+ }
+ }
+ }
+
+ // https://issues.apache.org/jira/browse/ARTEMIS-331
+ @Test
+ public void testSendStreamingEmptyMessagesWithRestart() throws Exception {
+ final String propertyName = "myStringPropertyName";
+ final String propertyValue = "myStringPropertyValue";
+ ClientSession session = null;
+ ActiveMQServer server = null;
+
+ final int SIZE = 0;
+ try {
+
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ locator.setMinLargeMessageSize(100 * 1024);
+
+ ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < 10; i++) {
+ ClientMessage clientFile = session.createMessage(true);
+ clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
+ clientFile.putStringProperty(propertyName, propertyValue + i);
+ producer.send(clientFile);
+ }
+
+ producer.close();
+
+ session.close();
+
+ sf.close();
+
+ server.stop();
+
+ server.start();
+
+ sf = addSessionFactory(createSessionFactory(locator));
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++) {
+ ClientMessage msg2 = consumer.receive(10000);
+
+ msg2.acknowledge();
+
+ msg2.setOutputStream(createFakeOutputStream());
+ Assert.assertTrue(msg2.waitOutputStreamCompletion(60000));
+ Assert.assertEquals(propertyValue + i, msg2.getStringProperty(propertyName));
+
+ session.commit();
+ }
+
+ Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
+ Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(ADDRESS).getBindable())));
+
+ }
+ finally {
+ try {
+ session.close();
+ }
+ catch (Throwable ignored) {
+ }
+
+ try {
+ server.stop();
+ }
+ catch (Throwable ignored) {
+ }
+ }
+ }
+
/**
* Receive messages but never reads them, leaving the buffer pending
*/