You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2023/02/16 21:34:35 UTC

[activemq-artemis] branch main updated (c123a29f8e -> fb169bc4af)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


    from c123a29f8e ARTEMIS-4161 Removing test.log left by accident
     new 82fc42987a ARTEMIS-4171 potential large message file leak
     new fb169bc4af ARTEMIS-4172 sending large msg via core skips plugins & audit log

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../protocol/core/ServerSessionPacketHandler.java  |  7 ++-
 .../tests/integration/client/LargeMessageTest.java | 51 +++++++++++++++++++++-
 .../tests/integration/plugin/CorePluginTest.java   | 18 +++++++-
 .../tests/smoke/logging/AuditLoggerTest.java       | 24 ++++++++--
 4 files changed, 92 insertions(+), 8 deletions(-)


[activemq-artemis] 01/02: ARTEMIS-4171 potential large message file leak

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 82fc42987a2af70600f832037adebde0826c8783
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Feb 15 14:48:53 2023 -0600

    ARTEMIS-4171 potential large message file leak
---
 .../protocol/core/ServerSessionPacketHandler.java  |  7 ++-
 .../tests/integration/client/LargeMessageTest.java | 51 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 2 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 28fea3cd6c..a466321243 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1106,7 +1106,12 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             LargeServerMessage message = currentLargeMessage;
             currentLargeMessage.setStorageManager(storageManager);
             currentLargeMessage = null;
-            session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage)message.toMessage(), storageManager), null, false, producers.get(senderID), false);
+            try {
+               session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false);
+            } catch (Exception e) {
+               message.deleteFile();
+               throw e;
+            }
          }
       }
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 774fcde742..856f3a2f86 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -26,6 +26,7 @@ import javax.transaction.xa.Xid;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.nio.ByteBuffer;
@@ -39,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.sun.management.UnixOperatingSystemMXBean;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -62,7 +64,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ServerProducer;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
@@ -77,7 +81,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 public class LargeMessageTest extends LargeMessageTestBase {
 
@@ -326,6 +329,52 @@ public class LargeMessageTest extends LargeMessageTestBase {
       validateNoFilesOnLargeDir();
    }
 
+   @Test
+   public void testFileRemovalOnFailure() throws Exception {
+      final AtomicBoolean throwException = new AtomicBoolean(false);
+      final String queueName = RandomUtil.randomString();
+      final int messageSize = (int) (3.5 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      server.registerBrokerPlugin(new ActiveMQServerMessagePlugin() {
+         @Override
+         public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
+            if (throwException.get()) {
+               throw new ActiveMQException();
+            }
+         }
+      });
+
+      server.createQueue(new QueueConfiguration(queueName));
+
+      ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = addClientSession(sf.createSession(false, true, false));
+
+      ClientProducer producer = session.createProducer(queueName);
+
+      Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
+
+      try {
+         throwException.set(true);
+         producer.send(clientFile);
+         fail("Should have thrown an exception here");
+      } catch (Exception e) {
+         // expected exception from plugin
+      } finally {
+         throwException.set(false);
+      }
+
+      assertEquals(0, server.locateQueue(queueName).getMessageCount());
+
+      session.close();
+
+      validateNoFilesOnLargeDir();
+   }
+
 
    @Test
    public void testPendingRecord() throws Exception {


[activemq-artemis] 02/02: ARTEMIS-4172 sending large msg via core skips plugins & audit log

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit fb169bc4af15dcad6deb7c050c8177576562e8d6
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Feb 15 16:14:34 2023 -0600

    ARTEMIS-4172 sending large msg via core skips plugins & audit log
---
 .../protocol/core/ServerSessionPacketHandler.java  |  2 +-
 .../tests/integration/plugin/CorePluginTest.java   | 18 ++++++++++++++--
 .../tests/smoke/logging/AuditLoggerTest.java       | 24 ++++++++++++++++++----
 3 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index a466321243..a1dae880d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1107,7 +1107,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             currentLargeMessage.setStorageManager(storageManager);
             currentLargeMessage = null;
             try {
-               session.doSend(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), null, false, producers.get(senderID), false);
+               session.send(session.getCurrentTransaction(), EmbedMessageUtil.extractEmbedded((ICoreMessage) message.toMessage(), storageManager), false, producers.get(senderID), false);
             } catch (Exception e) {
                message.deleteFile();
                throw e;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index a97899cbd2..314492bddb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -53,6 +53,7 @@ import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -125,9 +126,17 @@ public class CorePluginTest extends JMSTestBase {
       queue = createQueue("queue1");
    }
 
-
    @Test
    public void testSendReceive() throws Exception {
+      internalTestSendReceive(64);
+   }
+
+   @Test
+   public void testSendReceiveLarge() throws Exception {
+      internalTestSendReceive(1024 * 1024);
+   }
+
+   private void internalTestSendReceive(int messageSize) throws Exception {
       final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
          assertEquals(AckReason.NORMAL, reason);
          assertNotNull(consumer);
@@ -142,7 +151,12 @@ public class CorePluginTest extends JMSTestBase {
       MessageProducer prod = sess.createProducer(queue);
       MessageConsumer cons = sess.createConsumer(queue);
 
-      TextMessage msg1 = sess.createTextMessage("test");
+      byte[] msgs = new byte[messageSize];
+      for (int i = 0; i < msgs.length; i++) {
+         msgs[i] = RandomUtil.randomByte();
+      }
+
+      TextMessage msg1 = sess.createTextMessage(new String(msgs));
       prod.send(msg1);
       TextMessage received1 = (TextMessage)cons.receive(1000);
       assertNotNull(received1);
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
index 3eb1b79109..116b95def3 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
@@ -119,15 +119,25 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
 
    @Test
    public void testAuditHotLogCore() throws Exception {
-      internalSend("CORE");
+      internalSend("CORE", 64);
    }
 
    @Test
    public void testAuditHotLogAMQP() throws Exception {
-      internalSend("AMQP");
+      internalSend("AMQP", 64);
    }
 
-   public void internalSend(String protocol) throws Exception {
+   @Test
+   public void testAuditHotLogCoreLarge() throws Exception {
+      internalSend("CORE", 1024 * 1024);
+   }
+
+   @Test
+   public void testAuditHotLogAMQPLarge() throws Exception {
+      internalSend("AMQP", 1024 * 1024);
+   }
+
+   public void internalSend(String protocol, int messageSize) throws Exception {
       JMXConnector jmxConnector = getJmxConnector();
       MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
       String brokerName = "0.0.0.0";  // configured e.g. in broker.xml <broker-name> element
@@ -149,7 +159,13 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
       try {
          Session session = connection.createSession();
          MessageProducer producer = session.createProducer(session.createQueue(address.toString()));
-         TextMessage message = session.createTextMessage("msg1");
+
+         byte[] msgs = new byte[messageSize];
+         for (int i = 0; i < msgs.length; i++) {
+            msgs[i] = RandomUtil.randomByte();
+         }
+
+         TextMessage message = session.createTextMessage(new String(msgs));
          message.setStringProperty("str", uniqueStr);
          producer.send(message);