You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/11 14:58:46 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1268 Fix LargeMessages
over STOMP
ARTEMIS-1268 Fix LargeMessages over STOMP
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/905098bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/905098bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/905098bc
Branch: refs/heads/master
Commit: 905098bc4e9c26830239517fc35a696dab2a5b7a
Parents: f8554c7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Jul 10 16:38:37 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 11 10:58:40 2017 -0400
----------------------------------------------------------------------
.../core/protocol/stomp/StompSession.java | 5 +-
.../stomp/VersionedStompFrameHandler.java | 2 -
.../tests/integration/stomp/StompTest.java | 54 +++++++++++++++-----
.../tests/integration/stomp/StompTestBase.java | 3 ++
4 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 797a966..03b5757 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -389,8 +389,9 @@ public class StompSession implements SessionCallback {
long id = storageManager.generateID();
LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
- byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
- message.getBodyBuffer().readBytes(bytes);
+ ActiveMQBuffer body = message.getReadOnlyBodyBuffer();
+ byte[] bytes = new byte[body.readableBytes()];
+ body.readBytes(bytes);
largeMessage.addBytes(bytes);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index c5fc8f1..df6d9b0 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -299,8 +299,6 @@ public abstract class VersionedStompFrameHandler {
ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
- int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();
-
int size = buffer.writerIndex();
byte[] data = new byte[size];
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index f023cfb..c2f1964 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.stomp;
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
@@ -26,13 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -159,12 +158,16 @@ public class StompTest extends StompTestBase {
.setMaxUsage(0)
.tick();
- for (int i = 1; i <= count; i++) {
- // Thread.sleep(1);
- // log.info(">>> " + i);
- send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
+ // Connection should be closed by broker when disk is full and attempt to send
+ Exception e = null;
+ try {
+ for (int i = 1; i <= count; i++) {
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
+ }
+ } catch (Exception se) {
+ e = se;
}
-
+ assertNotNull(e);
// It should encounter the exception on logs
AssertionLoggerHandler.findText("AMQ119119");
} finally {
@@ -255,6 +258,33 @@ public class StompTest extends StompTestBase {
}
@Test
+ public void testSendReceiveLargeMessage() throws Exception {
+ String address = "testLargeMessageAddress";
+ server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);
+
+ // STOMP default is UTF-8 == 1 byte per char.
+ int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
+ StringBuilder b = new StringBuilder(largeMessageStringSize);
+ for (int i = 0; i < largeMessageStringSize; i++) {
+ b.append('t');
+ }
+ String payload = b.toString();
+
+ // Set up STOMP subscription
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);
+
+ // Send Large Message
+ System.out.println("Sending Message Size: " + largeMessageStringSize);
+ send(conn, address, null, payload);
+
+ // Receive STOMP Message
+ ClientStompFrame frame = conn.receiveFrame();
+ System.out.println(frame.getBody().length());
+ assertTrue(frame.getBody().equals(payload));
+ }
+
+ @Test
public void sendMQTTReceiveSTOMP() throws Exception {
String payload = "This is a test message";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index f885659..4e84857 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -137,6 +137,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
connection.start();
}
+
/**
* @return
* @throws Exception
@@ -168,6 +169,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
}
+ config.setPersistenceEnabled(true);
+
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
if (isSecurityEnabled()) {