You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/07/11 14:58:46 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1268 Fix LargeMessages over STOMP

ARTEMIS-1268 Fix LargeMessages over STOMP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/905098bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/905098bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/905098bc

Branch: refs/heads/master
Commit: 905098bc4e9c26830239517fc35a696dab2a5b7a
Parents: f8554c7
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Jul 10 16:38:37 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 11 10:58:40 2017 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompSession.java       |  5 +-
 .../stomp/VersionedStompFrameHandler.java       |  2 -
 .../tests/integration/stomp/StompTest.java      | 54 +++++++++++++++-----
 .../tests/integration/stomp/StompTestBase.java  |  3 ++
 4 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 797a966..03b5757 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -389,8 +389,9 @@ public class StompSession implements SessionCallback {
       long id = storageManager.generateID();
       LargeServerMessage largeMessage = storageManager.createLargeMessage(id, message);
 
-      byte[] bytes = new byte[message.getBodyBuffer().writerIndex() - CoreMessage.BODY_OFFSET];
-      message.getBodyBuffer().readBytes(bytes);
+      ActiveMQBuffer body = message.getReadOnlyBodyBuffer();
+      byte[] bytes = new byte[body.readableBytes()];
+      body.readBytes(bytes);
 
       largeMessage.addBytes(bytes);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index c5fc8f1..df6d9b0 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -299,8 +299,6 @@ public abstract class VersionedStompFrameHandler {
 
       ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer();
 
-      int bodyPos = (serverMessage).getEndOfBodyPosition() == -1 ? buffer.writerIndex() : (serverMessage).getEndOfBodyPosition();
-
       int size = buffer.writerIndex();
 
       byte[] data = new byte[size];

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index f023cfb..c2f1964 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.stomp;
 
+import javax.jms.BytesMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.TextMessage;
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashSet;
@@ -26,13 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import javax.jms.BytesMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.TextMessage;
-
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -159,12 +158,16 @@ public class StompTest extends StompTestBase {
                                                           .setMaxUsage(0)
                                                           .tick();
 
-         for (int i = 1; i <= count; i++) {
-            // Thread.sleep(1);
-            // log.info(">>> " + i);
-            send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
+         // Connection should be closed by broker when disk is full and attempt to send
+         Exception e = null;
+         try {
+            for (int i = 1; i <= count; i++) {
+               send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
+            }
+         } catch (Exception se) {
+            e = se;
          }
-
+         assertNotNull(e);
          // It should encounter the exception on logs
          AssertionLoggerHandler.findText("AMQ119119");
       } finally {
@@ -255,6 +258,33 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testSendReceiveLargeMessage() throws Exception {
+      String address = "testLargeMessageAddress";
+      server.getActiveMQServer().createQueue(SimpleString.toSimpleString(address), RoutingType.ANYCAST, SimpleString.toSimpleString(address), null, true, false);
+
+      // STOMP default is UTF-8 == 1 byte per char.
+      int largeMessageStringSize = 10 * 1024 * 1024; // 10MB
+      StringBuilder b = new StringBuilder(largeMessageStringSize);
+      for (int i = 0; i < largeMessageStringSize; i++) {
+         b.append('t');
+      }
+      String payload =  b.toString();
+
+      // Set up STOMP subscription
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null, address, true);
+
+      // Send Large Message
+      System.out.println("Sending Message Size: " + largeMessageStringSize);
+      send(conn, address, null, payload);
+
+      // Receive STOMP Message
+      ClientStompFrame frame = conn.receiveFrame();
+      System.out.println(frame.getBody().length());
+      assertTrue(frame.getBody().equals(payload));
+   }
+
+   @Test
    public void sendMQTTReceiveSTOMP() throws Exception {
       String payload = "This is a test message";
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/905098bc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index f885659..4e84857 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -137,6 +137,7 @@ public abstract class StompTestBase extends ActiveMQTestBase {
       connection.start();
    }
 
+
    /**
     * @return
     * @throws Exception
@@ -168,6 +169,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
          config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
       }
 
+      config.setPersistenceEnabled(true);
+
       ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
 
       if (isSecurityEnabled()) {