You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/11/28 15:03:48 UTC

[activemq-artemis] branch 2.27.x created (now d79c5c41b0)

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a change to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


      at d79c5c41b0 ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted

This branch includes the following new commits:

     new 66da97a3b1 ARTEMIS-4084 Fixing addSorted with large transactions
     new 220826b9b8 ARTEMIS-4084 Fixing CriticalAnalyzer policy on Test
     new 58f201bbe3 ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed
     new 60c5b9871e ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed
     new 902e912f0d NO-JIRA small tweak on test
     new f81134d748 ARTEMIS-4030 Fix SharedStoreLiveActivation race condition
     new f1030ddc19 ARTEMIS-4085 exclusive LVQ sending all messages to consumer
     new 64918caa10 ARTEMIS-4078 Fix divert reloading
     new b471392872 ARTEMIS-4089 Check on AutoCreation during routing
     new 7e00a701fe ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages
     new ecbb0e16d3 ARTEMIS-4092: resolve issues with upgrade backups, tweaks for clarity and consistency, additional output detail
     new bbf2402e12 NO-JIRA improve reconnect doc
     new d79c5c41b0 ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[activemq-artemis] 01/13: ARTEMIS-4084 Fixing addSorted with large transactions

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 66da97a3b151cfbf0f81fb32abda953b361b7170
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Nov 8 09:42:16 2022 -0500

    ARTEMIS-4084 Fixing addSorted with large transactions
    
    when cancelling a large number of messages, the addSorted could be holding a lock for too long causing the server to crash under CriticalAnalyzer
    
    co-authored: AntonRoskvist <an...@volvo.com> (discovering the issue and providing the test ClientCrashMassiveRollbackTest.java)
    (cherry picked from commit 03b82142eb0844b9de02ca3d7ed365d849e3ac02)
---
 .../artemis/utils/collections/LinkedListImpl.java  |  67 ++++++++++--
 .../artemis/core/server/impl/QueueImpl.java        |  42 +++++--
 .../artemis/core/server/impl/RefsOperation.java    |   4 +-
 .../client/ClientCrashMassiveRollbackTest.java     | 121 +++++++++++++++++++++
 .../artemis/tests/unit/util/LinkedListTest.java    |  73 +++++++++++--
 5 files changed, 281 insertions(+), 26 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 76ee69499e..8d1c98eada 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -16,12 +16,16 @@
  */
 package org.apache.activemq.artemis.utils.collections;
 
+import java.lang.invoke.MethodHandles;
 import java.lang.reflect.Array;
 import java.util.Comparator;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
  * elements added or removed from the queue either directly or via iterators.
@@ -30,6 +34,8 @@ import java.util.function.Consumer;
  */
 public class LinkedListImpl<E> implements LinkedList<E> {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
 
    private final Node<E> head = new NodeHolder<>(null);
@@ -42,6 +48,8 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    private int nextIndex;
    private NodeStore<E> nodeStore;
 
+   private volatile Node<E> lastAdd;
+
    public LinkedListImpl() {
       this(null, null);
    }
@@ -155,12 +163,18 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    }
 
    private void itemAdded(Node<E> node, E item) {
+      assert node.val() == item;
+      lastAdd = node;
+      if (logger.isTraceEnabled()) {
+         logger.trace("Setting lastAdd as {}, e={}", lastAdd, lastAdd.val());
+      }
       if (nodeStore != null) {
          putID(item, node);
       }
    }
 
    private void itemRemoved(Node<E> node) {
+      lastAdd = null;
       if (nodeStore != null) {
          nodeStore.removeNode(node.val(), node);
       }
@@ -186,13 +200,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
    }
 
    public void addSorted(E e) {
+      final Node<E> localLastAdd = lastAdd;
+
+      logger.trace("**** addSorted element {}", e);
+
       if (comparator == null) {
          throw new NullPointerException("comparator=null");
       }
+
       if (size == 0) {
+         logger.trace("adding head as there are no elements {}", e);
          addHead(e);
       } else {
          if (comparator.compare(head.next.val(), e) < 0) {
+            if (logger.isTraceEnabled()) {
+               logger.trace("addHead as e={} and head={}", e, head.next.val());
+            }
             addHead(e);
             return;
          }
@@ -203,18 +226,30 @@ public class LinkedListImpl<E> implements LinkedList<E> {
          // This would be an optimization for our usage.
          // avoiding scanning the entire List just to add at the end, so we compare the end first.
          if (comparator.compare(tail.val(), e) >= 0) {
+            logger.trace("addTail as e={} and tail={}", e, tail.val());
             addTail(e);
             return;
          }
 
-         Node<E> fetching = head.next;
-         while (fetching.next != null) {
-            int compareNext = comparator.compare(fetching.next.val(), e);
-            if (compareNext <= 0) {
-               addAfter(fetching, e);
-               return;
+         if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
+            if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
+               if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
+                  logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
+                  addAfter(localLastAdd.prev, e);
+                  return;
+               }
             }
-            fetching = fetching.next;
+            if (localLastAdd.next != null && localLastAdd.next.val() != null) {
+               if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
+                  logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
+                  addAfter(localLastAdd, e);
+                  return;
+               }
+            }
+         }
+
+         if (addSortedScan(e)) {
+            return;
          }
 
          // this shouldn't happen as the tail was compared before iterating
@@ -229,6 +264,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       }
    }
 
+   protected boolean addSortedScan(E e) {
+      logger.trace("addSortedScan {}...", e);
+      Node<E> fetching = head.next;
+      while (fetching.next != null) {
+         int compareNext = comparator.compare(fetching.next.val(), e);
+         if (compareNext <= 0) {
+            addAfter(fetching, e);
+            logger.trace("... addSortedScan done, returning true");
+            return true;
+         }
+         fetching = fetching.next;
+      }
+      logger.trace("... addSortedScan done, could not find a spot, returning false");
+      return false;
+   }
+
    private void addAfter(Node<E> node, E e) {
       Node<E> newNode = Node.with(e);
       Node<E> nextNode = node.next;
@@ -236,7 +287,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       newNode.prev = node;
       newNode.next = nextNode;
       nextNode.prev = newNode;
-      itemAdded(node, e);
+      itemAdded(newNode, e);
       size++;
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 908bdca23e..d222d192fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1100,8 +1100,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addHead(final MessageReference ref, boolean scheduling) {
-      if (logger.isDebugEnabled()) {
-         logger.debug("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
+      if (logger.isTraceEnabled()) {
+         logger.trace("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
       }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
          synchronized (this) {
@@ -1125,11 +1125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addSorted(final MessageReference ref, boolean scheduling) {
-      if (logger.isDebugEnabled()) {
-         logger.debug("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
+      if (logger.isTraceEnabled()) {
+         logger.trace("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
       }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
-         synchronized (this) {
+         synchronized (QueueImpl.this) {
             if (ringSize != -1) {
                enforceRing(ref, false, true);
             }
@@ -1165,6 +1165,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    /* Called when a message is cancelled back into the queue */
    @Override
    public void addSorted(final List<MessageReference> refs, boolean scheduling) {
+      if (refs.size() > MAX_DELIVERIES_IN_LOOP) {
+         logger.debug("Switching addSorted call to addSortedLargeTX on queue {}", name);
+         addSortedLargeTX(refs, scheduling);
+         return;
+      }
       try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
          synchronized (this) {
             for (MessageReference ref : refs) {
@@ -1178,6 +1183,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   // Perhaps we could just replace addSorted by addSortedLargeTX
+   // However I am not 100% confident we could always resetAllIterators
+   // we certainly can in the case of a rollback in a huge TX.
+   // so I am just playing safe and keeping the original semantic for small transactions.
+   private void addSortedLargeTX(final List<MessageReference> refs, boolean scheduling) {
+      for (MessageReference ref : refs) {
+         // When dealing with large transactions, we are not holding a synchronization lock here.
+         // addSorted will lock for each individual adds
+         addSorted(ref, scheduling);
+      }
+
+      if (logger.isDebugEnabled()) {
+         logger.debug("addSortedHugeLoad finished on queue {}", name);
+      }
+
+      synchronized (this) {
+
+         resetAllIterators();
+
+         deliverAsync();
+      }
+   }
+
    @Override
    public synchronized void reload(final MessageReference ref) {
       queueMemorySize.addSize(ref.getMessageMemoryEstimate());
@@ -2983,8 +3011,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     * are no more matching or available messages.
     */
    private boolean deliver() {
-      if (logger.isDebugEnabled()) {
-         logger.debug("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
+      if (logger.isTraceEnabled()) {
+         logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
       }
 
       scheduledRunners.decrementAndGet();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index 65f39783c9..6930047c4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -127,9 +127,7 @@ public class RefsOperation extends TransactionOperationAbstract {
 
          QueueImpl queue = entry.getKey();
 
-         synchronized (queue) {
-            queue.postRollback(refs);
-         }
+         queue.postRollback(refs);
       }
 
       if (!ackedRefs.isEmpty()) {
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
new file mode 100644
index 0000000000..9e5bbbef65
--- /dev/null
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.soak.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
+   protected ActiveMQServer server;
+   protected ClientSession session;
+   protected ClientSessionFactory sf;
+   protected ServerLocator locator;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      Configuration config = createDefaultNettyConfig();
+      config.setCriticalAnalyzer(true);
+      config.setCriticalAnalyzerTimeout(10000);
+      config.setCriticalAnalyzerCheckPeriod(5000);
+      config.setConnectionTTLOverride(5000);
+      config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
+      server = createServer(false, config);
+      server.start();
+   }
+
+   @Test
+   public void clientCrashMassiveRollbackTest() throws Exception {
+      final String queueName = "queueName";
+      final int messageCount = 1000000;
+
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("(tcp://localhost:61616)");
+      factory.setConsumerWindowSize(-1);
+      factory.setConfirmationWindowSize(10240000);
+      Connection connection = factory.createConnection();
+      connection.start();
+
+      Thread thread = new Thread(() -> {
+         try {
+            Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Queue destination = consumerSession.createQueue(queueName);
+            MessageConsumer consumer = consumerSession.createConsumer(destination);
+            for (;;) {
+               consumer.receive();
+            }
+         } catch (Exception e) {
+         }
+      });
+
+      locator = createNettyNonHALocator();
+      locator.setConfirmationWindowSize(10240000);
+      sf = createSessionFactory(locator);
+      session = addClientSession(sf.createSession(false, true, true));
+      SendAcknowledgementHandler sendHandler = message -> {
+      };
+      session.setSendAcknowledgementHandler(sendHandler);
+      session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
+      ClientProducer producer = session.createProducer(queueName);
+      QueueControl queueControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
+
+      thread.start();
+
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(session.createMessage(true));
+      }
+      producer.close();
+
+      while (queueControl.getDeliveringCount() < messageCount) {
+         Thread.sleep(1000);
+      }
+
+      thread.interrupt();
+
+      Assert.assertEquals(messageCount, queueControl.getMessageCount());
+      Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTED, server.getState());
+
+      server.stop();
+
+      Wait.assertEquals(ActiveMQServer.SERVER_STATE.STOPPED, server::getState, 5000, 100);
+
+
+   }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index 2245e8b02c..f95e91fc2c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.unit.util;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -38,27 +39,42 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class LinkedListTest extends ActiveMQTestBase {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private int scans = 0;
    private LinkedListImpl<Integer> list;
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      list = new LinkedListImpl<>(integerComparator);
+      list = new LinkedListImpl<>(integerComparator) {
+         @Override
+         protected boolean addSortedScan(Integer e) {
+            scans++;
+            return super.addSortedScan(e);
+         }
+      };
    }
 
    Comparator<Integer> integerComparator = new Comparator<Integer>() {
       @Override
       public int compare(Integer o1, Integer o2) {
+         logger.trace("Compare {} and {}", o1, o2);
          if (o1.intValue() == o2.intValue()) {
+            logger.trace("Return 0");
             return 0;
          }
          if (o2.intValue() > o1.intValue()) {
+            logger.trace("o2 is greater than, returning 1");
             return 1;
          } else {
+            logger.trace("o2 is lower than, returning -1");
             return -1;
          }
       }
@@ -66,27 +82,68 @@ public class LinkedListTest extends ActiveMQTestBase {
 
    @Test
    public void addSorted() {
+      Assert.assertEquals(0, scans); // sanity check
 
       list.addSorted(1);
       list.addSorted(3);
       list.addSorted(2);
       list.addSorted(0);
+
+      Assert.assertEquals(0, scans); // all adds were somewhat ordered, it shouldn't be doing any scans
+
       validateOrder(null);
       Assert.assertEquals(4, list.size());
 
    }
 
+   @Test
+   public void addSortedCachedLast() {
+      Assert.assertEquals(0, scans); // just a sanity check
+      list.addSorted(5);
+      list.addSorted(1);
+      list.addSorted(3);
+      list.addSorted(4);
+      Assert.assertEquals(0, scans); // no scans made until now
+      list.addSorted(2); // this should need a scan
+      Assert.assertEquals(1, scans);
+      list.addSorted(10);
+      list.addSorted(20);
+      list.addSorted(7); // this will need a scan as it's totally random
+      Assert.assertEquals(2, scans);
+      printDebug();
+
+      validateOrder(null);
+
+   }
+
+   private void printDebug() {
+      if (logger.isDebugEnabled()) {
+         logger.debug("**** list output:");
+         LinkedListIterator<Integer> integerIterator = list.iterator();
+         while (integerIterator.hasNext()) {
+            logger.debug("list {}", integerIterator.next());
+         }
+         integerIterator.close();
+      }
+   }
 
    @Test
    public void randomSorted() {
 
-      HashSet<Integer> values = new HashSet<>();
-      for (int i = 0; i < 1000; i++) {
+      int elements = 10_000;
 
-         int value = RandomUtil.randomInt();
-         if (!values.contains(value)) {
-            values.add(value);
-            list.addSorted(value);
+      HashSet<Integer> values = new HashSet<>();
+      for (int i = 0; i < elements; i++) {
+         for (;;) { // a retry loop, if a random give me the same value twice, I would retry
+            int value = RandomUtil.randomInt();
+            if (!values.contains(value)) { // validating if the random is repeated or not, and retrying otherwise
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Adding {}", value);
+               }
+               values.add(value);
+               list.addSorted(value);
+               break;
+            }
          }
       }
 
@@ -102,8 +159,8 @@ public class LinkedListTest extends ActiveMQTestBase {
       Integer previous = null;
       LinkedListIterator<Integer> integerIterator = list.iterator();
       while (integerIterator.hasNext()) {
-
          Integer value = integerIterator.next();
+         logger.debug("Reading {}", value);
          if (previous != null) {
             Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0);
             Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());


[activemq-artemis] 10/13: ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 7e00a701fe3ae03c196ab3669eeccb2fcc5cac6c
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Nov 17 15:34:44 2022 -0500

    ARTEMIS-4096 Bridge transfer is broken with AMQP Large messages
    
    (cherry picked from commit 0866a2eb8846284d2b865ef876b146cd80270f59)
---
 .../spi/core/protocol/EmbedMessageUtil.java        | 10 ++-
 .../AMQPLargeMessageOverCoreBridgeTest.java        |  2 +
 .../ClusteredLargeMessageTest.java                 | 78 +++++++++++++++++++++-
 3 files changed, 87 insertions(+), 3 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
index 5678f1b54c..f3a9c80769 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/EmbedMessageUtil.java
@@ -112,7 +112,15 @@ public class EmbedMessageUtil {
    private static Message extractLargeMessage(ICoreMessage message, StorageManager storageManager) {
       ActiveMQBuffer buffer = ActiveMQBuffers.wrappedBuffer(message.getBytesProperty(AMQP_ENCODE_PROPERTY));
 
-      return readEncoded(message, storageManager, buffer);
+      Message largeMessageReturn = readEncoded(message, storageManager, buffer);
+
+      if (message instanceof LargeServerMessage && largeMessageReturn instanceof LargeServerMessage) {
+         LargeServerMessage returnMessage = (LargeServerMessage) largeMessageReturn;
+         LargeServerMessage sourceMessage = (LargeServerMessage) message;
+         returnMessage.setPendingRecordID(sourceMessage.getPendingRecordID());
+      }
+
+      return largeMessageReturn;
    }
 
    private static boolean checkSignature(final ActiveMQBuffer buffer) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
index c63bc9900d..0f1f5c4860 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/AMQPLargeMessageOverCoreBridgeTest.java
@@ -123,6 +123,8 @@ public class AMQPLargeMessageOverCoreBridgeTest extends AmqpClientTestSupport {
       }
 
       sendTextMessages(AMQP_PORT + 1, getQueueName(useDivert ? 0 : 1), largeText.toString(), 10);
+      server.stop();
+      server.start();
       receiveTextMessages(AMQP_PORT, getQueueName(2), largeText.toString(), 10);
       if (useDivert) {
          // We diverted, so messages were copied, we need to make sure we consume from the original queue
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
index eb6bece11f..f51bd5cd2c 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/clusteredLargeMessage/ClusteredLargeMessageTest.java
@@ -36,14 +36,15 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
    public static final String SERVER_NAME_0 = "clusteredLargeMessage/cluster1";
    public static final String SERVER_NAME_1 = "clusteredLargeMessage/cluster2";
 
+   Process server0Process;
    Process server1Process;
 
    @Before
    public void before() throws Exception {
       cleanupData(SERVER_NAME_0);
       cleanupData(SERVER_NAME_1);
-      server1Process = startServer(SERVER_NAME_0, 0, 30000);
-      startServer(SERVER_NAME_1, 100, 30000);
+      server0Process = startServer(SERVER_NAME_0, 0, 30000);
+      server1Process = startServer(SERVER_NAME_1, 100, 30000);
    }
 
    @Test
@@ -96,5 +97,78 @@ public class ClusteredLargeMessageTest extends SmokeTestBase {
       connection1.close();
       connection2.close();
    }
+
+   @Test
+   public void testKillWhileSendingLargeCORE() throws Exception {
+      testKillWhileSendingLarge("CORE");
+   }
+
+   @Test
+   public void testKillWhileSendingLargeAMQP() throws Exception {
+      testKillWhileSendingLarge("AMQP");
+   }
+
+   public void testKillWhileSendingLarge(String protocol) throws Exception {
+
+      ConnectionFactory server2CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61716");
+      Connection keepConsumerConnection = server2CF.createConnection();
+      Session keepConsumerSession = keepConsumerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      // a consumer that we should keep to induce message redistribution
+      MessageConsumer keepConsumer = keepConsumerSession.createConsumer(keepConsumerSession.createQueue("testQueue"));
+
+      String largeBody;
+      {
+         StringBuffer largeBodyBuffer = new StringBuffer();
+         while (largeBodyBuffer.length() < 1024 * 1024) {
+            largeBodyBuffer.append("This is large ");
+         }
+         largeBody = largeBodyBuffer.toString();
+      }
+
+      int NMESSAGES = 10;
+
+      ConnectionFactory server1CF = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+      try (Connection connection1 = server1CF.createConnection()) {
+         Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
+         Queue queue1 = session1.createQueue("testQueue");
+         MessageProducer producer1 = session1.createProducer(queue1);
+         for (int i = 0; i < NMESSAGES; i++) {
+            TextMessage message = session1.createTextMessage(largeBody);
+            message.setStringProperty("i", Integer.toString(i));
+            producer1.send(message);
+
+            if (i == 5) {
+               session1.commit();
+            }
+         }
+         session1.commit();
+      }
+
+      keepConsumerConnection.close();
+      server1Process.destroyForcibly();
+      server1Process = startServer(SERVER_NAME_1, 100, 0);
+
+      for (int i = 0; i < 100; i++) {
+         // retrying the connection until the server is up
+         try (Connection willbegone = server2CF.createConnection()) {
+            break;
+         } catch (Exception ignored) {
+            Thread.sleep(100);
+         }
+      }
+
+      try (Connection connection2 = server2CF.createConnection()) {
+         Session session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         Queue queue2 = session2.createQueue("testQueue");
+         MessageConsumer consumer2 = session2.createConsumer(queue2);
+         connection2.start();
+
+         for (int i = 0; i < NMESSAGES; i++) {
+            TextMessage message = (TextMessage) consumer2.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(largeBody, message.getText());
+         }
+      }
+   }
 }
 


[activemq-artemis] 12/13: NO-JIRA improve reconnect doc

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit bbf2402e121fa65ce15b0e07b018f1575b8027db
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Nov 23 19:32:02 2022 -0600

    NO-JIRA improve reconnect doc
    
    (cherry picked from commit bbd5043f4fc9dd9233762284595a11f60ec918da)
---
 docs/user-manual/en/client-reconnection.md | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/docs/user-manual/en/client-reconnection.md b/docs/user-manual/en/client-reconnection.md
index 51db603b0f..8f87877db1 100644
--- a/docs/user-manual/en/client-reconnection.md
+++ b/docs/user-manual/en/client-reconnection.md
@@ -6,9 +6,9 @@ connection between the client and the server.
 
 ## 100% Transparent session re-attachment
 
-If the failure was due to some transient failure such as a temporary network
-failure, and the target server was not restarted, then the sessions will still
-be existent on the server, assuming the client hasn't been disconnected for
+If the disconnection was due to some transient failure such as a temporary
+network outage and the target server was not restarted, then the sessions will
+still exist on the server, assuming the client hasn't been disconnected for
 more than [connection-ttl](connection-ttl.md)
 
 In this scenario, Apache ActiveMQ Artemis will automatically re-attach the
@@ -44,13 +44,13 @@ occur)
 ## Session reconnection
 
 Alternatively, the server might have actually been restarted after crashing or
-being stopped. In this case any sessions will no longer be existent on the
-server and it won't be possible to 100% transparently re-attach to them.
+being stopped. In this case any sessions will no longer exist on the server and
+it won't be possible to 100% transparently re-attach to them.
 
-In this case, Apache ActiveMQ Artemis will automatically reconnect the
-connection and *recreate* any sessions and consumers on the server
-corresponding to the sessions and consumers on the client. This process is
-exactly the same as what happens during failover onto a backup server.
+In this case, the Apache ActiveMQ Artemis client will automatically reconnect
+and *recreate* any sessions and consumers on the server corresponding to the
+sessions and consumers on the client. This process is exactly the same as what
+happens during failover onto a backup server.
 
 Client reconnection is also used internally by components such as core bridges
 to allow them to reconnect to their target servers.


[activemq-artemis] 03/13: ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 58f201bbe3c0902d509f85b7f5d15f6425f324fb
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Nov 10 06:27:10 2022 -0500

    ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed
    
    (cherry picked from commit 9528e4586914808cc9568533f16c12310e743ae1)
---
 .../core/client/impl/ClientProducerImpl.java       | 110 +++++++++++----------
 .../activemq/artemis/utils/DeflaterReader.java     |   3 +-
 .../tests/integration/client/LargeMessageTest.java |  58 +++++++++++
 3 files changed, 116 insertions(+), 55 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index a4e4a7eb99..bdb3a02223 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -434,79 +434,81 @@ public class ClientProducerImpl implements ClientProducerInternal {
 
       boolean headerSent = false;
 
-      int reconnectID = sessionContext.getReconnectID();
-      while (!lastPacket) {
-         byte[] buff = new byte[minLargeMessageSize];
-
-         int pos = 0;
-
-         do {
-            int numberOfBytesRead;
-
-            int wanted = minLargeMessageSize - pos;
+      try {
+         int reconnectID = sessionContext.getReconnectID();
+         while (!lastPacket) {
+            byte[] buff = new byte[minLargeMessageSize];
 
-            try {
-               numberOfBytesRead = input.read(buff, pos, wanted);
-            } catch (IOException e) {
-               throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
-            }
+            int pos = 0;
 
-            if (numberOfBytesRead == -1) {
-               lastPacket = true;
+            do {
+               int numberOfBytesRead;
 
-               break;
-            }
+               int wanted = minLargeMessageSize - pos;
 
-            pos += numberOfBytesRead;
-         }
-         while (pos < minLargeMessageSize);
+               try {
+                  numberOfBytesRead = input.read(buff, pos, wanted);
+               } catch (IOException e) {
+                  throw ActiveMQClientMessageBundle.BUNDLE.errorReadingBody(e);
+               }
 
-         totalSize += pos;
+               if (numberOfBytesRead == -1) {
+                  lastPacket = true;
 
-         if (lastPacket) {
-            if (!session.isCompressLargeMessages()) {
-               messageSize.set(totalSize);
-            }
-
-            // This is replacing the last packet by a smaller packet
-            byte[] buff2 = new byte[pos];
+                  break;
+               }
 
-            System.arraycopy(buff, 0, buff2, 0, pos);
+               pos += numberOfBytesRead;
+            } while (pos < minLargeMessageSize);
 
-            buff = buff2;
+            totalSize += pos;
 
-            // This is the case where the message is being converted as a regular message
-            if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) {
-               msgI.getBodyBuffer().resetReaderIndex();
-               msgI.getBodyBuffer().resetWriterIndex();
-               msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
+            if (lastPacket) {
+               if (!session.isCompressLargeMessages()) {
+                  messageSize.set(totalSize);
+               }
 
-               msgI.getBodyBuffer().writeBytes(buff, 0, pos);
-               sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
-               return;
+               // This is replacing the last packet by a smaller packet
+               byte[] buff2 = new byte[pos];
+
+               System.arraycopy(buff, 0, buff2, 0, pos);
+
+               buff = buff2;
+
+               // This is the case where the message is being converted as a regular message
+               if (!headerSent && session.isCompressLargeMessages() && buff2.length < minLargeMessageSize) {
+                  msgI.getBodyBuffer().resetReaderIndex();
+                  msgI.getBodyBuffer().resetWriterIndex();
+                  msgI.putLongProperty(Message.HDR_LARGE_BODY_SIZE, deflaterReader.getTotalSize());
+
+                  msgI.getBodyBuffer().writeBytes(buff, 0, pos);
+                  sendRegularMessage(msgI.getAddressSimpleString(), msgI, sendBlocking, credits, handler);
+                  return;
+               } else {
+                  if (!headerSent) {
+                     headerSent = true;
+                     sendInitialLargeMessageHeader(msgI, credits);
+                  }
+                  int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+                  credits.acquireCredits(creditsSent);
+               }
             } else {
                if (!headerSent) {
                   headerSent = true;
                   sendInitialLargeMessageHeader(msgI, credits);
                }
-               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, true, buff, reconnectID, handler);
+
+               int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
                credits.acquireCredits(creditsSent);
             }
-         } else {
-            if (!headerSent) {
-               headerSent = true;
-               sendInitialLargeMessageHeader(msgI, credits);
-            }
-
-            int creditsSent = sessionContext.sendLargeMessageChunk(msgI, messageSize.get(), sendBlocking, false, buff, reconnectID, handler);
-            credits.acquireCredits(creditsSent);
+         }
+      } finally {
+         try {
+            input.close();
+         } catch (IOException e) {
+            throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
          }
       }
 
-      try {
-         input.close();
-      } catch (IOException e) {
-         throw ActiveMQClientMessageBundle.BUNDLE.errorClosingLargeMessage(e);
-      }
    }
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
index 2f7844ae39..4443687bde 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/DeflaterReader.java
@@ -107,7 +107,8 @@ public class DeflaterReader extends InputStream {
       return read;
    }
 
-   public void closeStream() throws IOException {
+   @Override
+   public void close() throws IOException {
       super.close();
       input.close();
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 11b1204249..2e00436088 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -24,6 +24,8 @@ import javax.jms.Session;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.OperatingSystemMXBean;
 import java.nio.ByteBuffer;
@@ -31,6 +33,7 @@ import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.sun.management.UnixOperatingSystemMXBean;
@@ -2791,6 +2794,61 @@ public class LargeMessageTest extends LargeMessageTestBase {
       Wait.assertTrue(() -> ((UnixOperatingSystemMXBean)os).getOpenFileDescriptorCount() - fdBefore < 3);
    }
 
+   @Test
+   public void testStream() throws Exception {
+      ActiveMQServer server = createServer(true, isNetty(), storeType);
+
+      server.start();
+
+      locator.setCompressLargeMessage(true);
+
+      ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
+
+      ClientSession session = sf.createSession(false, false);
+
+      final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+      server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
+
+      ClientProducer producer = session.createProducer(MY_QUEUE);
+
+      AtomicBoolean closed = new AtomicBoolean(false);
+
+      InputStream inputStream = new InputStream() {
+         int bytes = 10_000;
+         @Override
+         public int read() throws IOException {
+            if (bytes-- > 0) {
+               return 10;
+            } else {
+               return -1;
+            }
+         }
+
+         @Override
+         public void close() {
+            closed.set(true);
+         }
+
+
+         @Override
+         public int available () throws IOException {
+            return bytes;
+         }
+
+      };
+
+      ClientMessage message = session.createMessage(true);
+      message.setBodyInputStream(inputStream);
+      producer.send(message);
+
+      Wait.assertTrue(closed::get);
+
+      session.close();
+
+   }
+
+
 
 
 }
\ No newline at end of file


[activemq-artemis] 13/13: ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d79c5c41b0f5222a22a6cb71a4ed19084e0180fd
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Nov 25 11:41:03 2022 -0500

    ARTEMIS-4084 Dealing with multi consumers crashes, Improving cached addSorted
    
    (cherry picked from commit af2b8e4b072e13a9925c6fecd048be130cf3688e)
---
 .../artemis/utils/collections/LinkedListImpl.java  | 44 ++++++++++++++++++----
 .../client/ClientCrashMassiveRollbackTest.java     |  4 ++
 .../artemis/tests/unit/util/LinkedListTest.java    | 40 +++++++++++++++-----
 3 files changed, 71 insertions(+), 17 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 8d1c98eada..5e36d333d2 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -232,17 +232,20 @@ public class LinkedListImpl<E> implements LinkedList<E> {
          }
 
          if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
-            if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
-               if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
-                  logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
-                  addAfter(localLastAdd.prev, e);
+            if (logger.isDebugEnabled()) {
+               logger.debug("localLastAdd Value = {}, we are adding {}", localLastAdd.val(), e);
+            }
+
+            int compareLastAdd = comparator.compare(localLastAdd.val(), e);
+
+            if (compareLastAdd > 0) {
+               if (scanRight(localLastAdd, e)) {
                   return;
                }
             }
-            if (localLastAdd.next != null && localLastAdd.next.val() != null) {
-               if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
-                  logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
-                  addAfter(localLastAdd, e);
+
+            if (compareLastAdd < 0) {
+               if (scanLeft(localLastAdd, e)) {
                   return;
                }
             }
@@ -264,6 +267,31 @@ public class LinkedListImpl<E> implements LinkedList<E> {
       }
    }
 
+   protected boolean scanRight(Node<E> position, E e) {
+      Node<E> fetching = position.next;
+      while (fetching != null) {
+         if (comparator.compare(fetching.val(), e) < 0) {
+            addAfter(position, e);
+            return true;
+         }
+         position = fetching;
+         fetching = fetching.next;
+      }
+      return false; // unlikely to happen, using this just to be safe
+   }
+
+   protected boolean scanLeft(Node<E> position, E e) {
+      Node<E> fetching = position.prev;
+      while (fetching != null) {
+         if (comparator.compare(fetching.val(), e) > 0) {
+            addAfter(fetching, e);
+            return true;
+         }
+         fetching = fetching.prev;
+      }
+      return false; // unlikely to happen, using this just to be safe
+   }
+
    protected boolean addSortedScan(E e) {
       logger.trace("addSortedScan {}...", e);
       Node<E> fetching = head.next;
diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
index e4641f9696..82c22f4e44 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -77,8 +77,12 @@ public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
             Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
             Queue destination = consumerSession.createQueue(queueName);
             MessageConsumer consumer = consumerSession.createConsumer(destination);
+            MessageConsumer consumer2 = consumerSession.createConsumer(destination);
+            MessageConsumer consumer3 = consumerSession.createConsumer(destination);
             for (;;) {
                consumer.receive();
+               consumer2.receive();
+               consumer3.receive();
             }
          } catch (Exception e) {
          }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index b39a9e75dc..eacd29e469 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -33,9 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import io.netty.util.collection.LongObjectHashMap;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.RandomUtil;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,23 +103,45 @@ public class LinkedListTest extends ActiveMQTestBase {
       list.addSorted(1);
       list.addSorted(3);
       list.addSorted(4);
-      Assert.assertEquals(0, scans); // no scans made until now
-      list.addSorted(2); // this should need a scan
-      Assert.assertEquals(1, scans);
+      list.addSorted(2);
       list.addSorted(10);
       list.addSorted(20);
       list.addSorted(19);
-      list.addSorted(7); // this will need a scan as it's totally random
-      Assert.assertEquals(2, scans);
+      list.addSorted(7);
       list.addSorted(8);
-      Assert.assertEquals(2, scans);
+      Assert.assertEquals(0, scans); // no full scans should be done
       Assert.assertEquals(1, (int)list.poll());
       list.addSorted(9);
-      Assert.assertEquals(3, scans); // remove (poll) should clear the last added cache
-      printDebug();
+      Assert.assertEquals(1, scans); // remove (poll) should clear the last added cache, a scan will be needed
 
+      printDebug();
       validateOrder(null);
+   }
 
+   @Test
+   public void scanDirectionalTest() {
+      list.addSorted(9);
+      Assert.assertEquals(1, list.size());
+      list.addSorted(5);
+      Assert.assertEquals(2, list.size());
+      list.addSorted(6);
+      Assert.assertEquals(3, list.size());
+      list.addSorted(2);
+      Assert.assertEquals(4, list.size());
+      list.addSorted(7);
+      Assert.assertEquals(5, list.size());
+      list.addSorted(4);
+      Assert.assertEquals(6, list.size());
+      list.addSorted(8);
+      Assert.assertEquals(7, list.size());
+      list.addSorted(1);
+      Assert.assertEquals(8, list.size());
+      list.addSorted(10);
+      Assert.assertEquals(9, list.size());
+      list.addSorted(3);
+      Assert.assertEquals(10, list.size());
+      printDebug();
+      validateOrder(null);
    }
 
    private void printDebug() {


[activemq-artemis] 06/13: ARTEMIS-4030 Fix SharedStoreLiveActivation race condition

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f81134d748c9eb0427adbe578bfac74faae9f77f
Author: Domenico Francesco Bruscino <br...@apache.org>
AuthorDate: Fri Oct 7 09:09:07 2022 +0200

    ARTEMIS-4030 Fix SharedStoreLiveActivation race condition
    
    DefaultCriticalErrorListener stops the server while SharedStoreLiveActivation
    is initializing it.
    
    (cherry picked from commit 3a5e4ce36300573525b96b101b69a0a287e1a8ea)
---
 .../core/server/impl/SharedStoreLiveActivation.java        | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
index b4e2f2ac73..635b6f317b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java
@@ -85,16 +85,18 @@ public final class SharedStoreLiveActivation extends LiveActivation {
          nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
          activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
 
-         if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
+         synchronized (activeMQServer) {
+            if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
                || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
-            return;
-         }
+               return;
+            }
 
-         activeMQServer.initialisePart2(false);
+            activeMQServer.initialisePart2(false);
 
-         activeMQServer.completeActivation(false);
+            activeMQServer.completeActivation(false);
 
-         ActiveMQServerLogger.LOGGER.serverIsLive();
+            ActiveMQServerLogger.LOGGER.serverIsLive();
+         }
       } catch (NodeManagerException nodeManagerException) {
          if (nodeManagerException.getCause() instanceof ClosedChannelException) {
             // this is ok, we are being stopped


[activemq-artemis] 09/13: ARTEMIS-4089 Check on AutoCreation during routing

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit b471392872cf15c2a29f2b3c4e9a9ca4a9586e49
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Nov 11 07:24:15 2022 -0500

    ARTEMIS-4089 Check on AutoCreation during routing
    
    (cherry picked from commit 4f79eb42f53071e3606a4848ec72dd164a88e350)
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  48 +---
 .../core/postoffice/impl/PostOfficeImpl.java       |  94 +++++--
 .../artemis/core/server/RoutingContext.java        |   4 +
 .../artemis/core/server/ServerSession.java         |   2 +
 .../core/server/impl/RoutingContextImpl.java       |  30 ++-
 .../core/server/impl/ServerSessionImpl.java        |  57 +++-
 .../integration/amqp/AmqpExpiredMessageTest.java   |   9 +
 .../integration/client/DeleteAddressTest.java      | 291 +++++++++++++++++++++
 .../tests/integration/client/LargeMessageTest.java |   3 +-
 .../integration/management/AddressControlTest.java |   1 +
 10 files changed, 465 insertions(+), 74 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index efa25c89cb..637cdf10f2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -20,7 +20,6 @@ import java.util.Map;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
@@ -47,7 +45,6 @@ import org.apache.activemq.artemis.core.server.ServerProducer;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -332,45 +329,8 @@ public class AMQPSessionCallback implements SessionCallback {
    }
 
 
-
    public boolean checkAddressAndAutocreateIfPossible(SimpleString address, RoutingType routingType) throws Exception {
-      boolean result = false;
-      SimpleString unPrefixedAddress = serverSession.removePrefix(address);
-      AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
-
-      if (routingType == RoutingType.MULTICAST) {
-         if (manager.getServer().getAddressInfo(unPrefixedAddress) == null) {
-            if (addressSettings.isAutoCreateAddresses()) {
-               try {
-                  serverSession.createAddress(address, routingType, true);
-               } catch (ActiveMQAddressExistsException e) {
-                  // The address may have been created by another thread in the mean time.  Catch and do nothing.
-               }
-               result = true;
-            }
-         } else {
-            result = true;
-         }
-      } else if (routingType == RoutingType.ANYCAST) {
-         if (manager.getServer().locateQueue(unPrefixedAddress) == null) {
-            Bindings bindings = manager.getServer().getPostOffice().lookupBindingsForAddress(address);
-            if (bindings != null) {
-               // this means the address has another queue with a different name, which is fine, we just ignore it on this case
-               result = true;
-            } else if (addressSettings.isAutoCreateQueues()) {
-               try {
-                  serverSession.createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
-               } catch (ActiveMQQueueExistsException e) {
-                  // The queue may have been created by another thread in the mean time.  Catch and do nothing.
-               }
-               result = true;
-            }
-         } else {
-            result = true;
-         }
-      }
-
-      return result;
+      return serverSession.checkAutoCreate(address, routingType);
    }
 
    public AddressQueryResult addressQuery(SimpleString addressName,
@@ -506,7 +466,11 @@ public class AMQPSessionCallback implements SessionCallback {
 
       //here check queue-autocreation
       if (!checkAddressAndAutocreateIfPossible(address, routingType)) {
-         throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+         ActiveMQException e = ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist();
+         if (transaction != null) {
+            transaction.markAsRollbackOnly(e);
+         }
+         throw e;
       }
 
       OperationContext oldcontext = recoverContext();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 5949ad8448..b9c60f1fb3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -38,6 +38,7 @@ import java.util.stream.Stream;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQShutdownException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -1154,7 +1155,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
       message.clearInternalProperties();
       Bindings bindings;
-      final AddressInfo addressInfo = addressManager.getAddressInfo(address);
+      final AddressInfo addressInfo = checkAddress(context, address);
+
+      final RoutingStatus status;
       if (bindingMove != null) {
          context.clear();
          context.setReusable(false);
@@ -1162,18 +1165,28 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
          if (addressInfo != null) {
             addressInfo.incrementRoutedMessageCount();
          }
-      } else if ((bindings = addressManager.getBindingsForRoutingAddress(address)) != null) {
-         bindings.route(message, context);
-         if (addressInfo != null) {
-            addressInfo.incrementRoutedMessageCount();
-         }
+         status = RoutingStatus.OK;
       } else {
-         context.setReusable(false);
-         if (addressInfo != null) {
-            addressInfo.incrementUnRoutedMessageCount();
+         bindings = simpleRoute(address, context, message, addressInfo);
+         if (logger.isDebugEnabled()) {
+            if (bindings != null) {
+               logger.debug("PostOffice::simpleRoute returned bindings with size = {}", bindings.getBindings().size());
+            } else {
+               logger.debug("PostOffice::simpleRoute null as bindings");
+            }
+         }
+         if (bindings == null) {
+            context.setReusable(false);
+            context.clear();
+            if (addressInfo != null) {
+               addressInfo.incrementUnRoutedMessageCount();
+            }
+            // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
+            logger.debug("Couldn't find any bindings for address={} on message={}", address, message);
+            status = RoutingStatus.NO_BINDINGS;
+         } else {
+            status = RoutingStatus.OK;
          }
-         // this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
-         logger.debug("Couldn't find any bindings for address={} on message={}", address, message);
       }
 
       if (server.hasBrokerMessagePlugins()) {
@@ -1182,14 +1195,20 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       logger.trace("Message after routed={}\n{}", message, context);
 
+      final RoutingStatus finalStatus;
       try {
-         final RoutingStatus status;
-         if (context.getQueueCount() == 0) {
-            status = maybeSendToDLA(message, context, address, sendToDLA);
+         if ( status == RoutingStatus.NO_BINDINGS) {
+            finalStatus = maybeSendToDLA(message, context, address, sendToDLA);
          } else {
-            status = RoutingStatus.OK;
+            finalStatus = status;
             try {
-               processRoute(message, context, direct);
+               if (context.getQueueCount() > 0) {
+                  processRoute(message, context, direct);
+               } else {
+                  if (message.isLargeMessage()) {
+                     ((LargeServerMessage) message).deleteFile();
+                  }
+               }
             } catch (ActiveMQAddressFullException e) {
                if (startedTX) {
                   context.getTransaction().rollback();
@@ -1203,9 +1222,9 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             context.getTransaction().commit();
          }
          if (server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, status));
+            server.callBrokerMessagePlugins(plugin -> plugin.afterMessageRoute(message, context, direct, rejectDuplicates, finalStatus));
          }
-         return status;
+         return finalStatus;
       } catch (Exception e) {
          if (server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> plugin.onMessageRouteException(message, context, direct, rejectDuplicates, e));
@@ -1214,6 +1233,45 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
    }
 
+   private AddressInfo checkAddress(RoutingContext context, SimpleString address) throws Exception {
+      AddressInfo addressInfo = addressManager.getAddressInfo(address);
+      if (addressInfo == null && context.getServerSession() != null) {
+         if (context.getServerSession().checkAutoCreate(address, context.getRoutingType())) {
+            addressInfo = addressManager.getAddressInfo(address);
+         } else {
+            ActiveMQException ex = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
+            if (context.getTransaction() != null) {
+               context.getTransaction().markAsRollbackOnly(ex);
+            }
+            throw ex;
+         }
+      }
+      return addressInfo;
+   }
+
+   Bindings simpleRoute(SimpleString address, RoutingContext context, Message message, AddressInfo addressInfo) throws Exception {
+      Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
+      if (bindings == null && context.getServerSession() != null) {
+         if (!context.getServerSession().checkAutoCreate(address, context.getRoutingType())) {
+            ActiveMQException e = ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(address);
+            Transaction tx = context.getTransaction();
+            if (tx != null) {
+               tx.markAsRollbackOnly(e);
+            }
+            throw e;
+         }
+         bindings = addressManager.getBindingsForRoutingAddress(address);
+      }
+      if (bindings != null) {
+         bindings.route(message, context);
+         if (addressInfo != null) {
+            addressInfo.incrementRoutedMessageCount();
+         }
+      }
+      return bindings;
+   }
+
+
    private RoutingStatus maybeSendToDLA(final Message message,
                                         final RoutingContext context,
                                         final SimpleString address,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index f6cff3d1be..d95c7ae9d8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -103,5 +103,9 @@ public interface RoutingContext {
 
    MessageLoadBalancingType getLoadBalancingType();
 
+   RoutingContext setServerSession(ServerSession session);
+
+   ServerSession getServerSession();
+
 
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index eff638f1e0..f238a4925d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -110,6 +110,8 @@ public interface ServerSession extends SecurityAuth {
 
    void addCloseable(Closeable closeable);
 
+   boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception;
+
    ServerConsumer createConsumer(long consumerID,
                                  SimpleString queueName,
                                  SimpleString filterString,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index c047171154..1220190d12 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -30,6 +29,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.server.mirror.MirrorController;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -65,10 +65,10 @@ public class RoutingContextImpl implements RoutingContext {
 
    boolean mirrorDisabled = false;
 
-   private final Executor executor;
-
    private boolean duplicateDetection = true;
 
+   private ServerSession serverSession;
+
    @Override
    public boolean isDuplicateDetection() {
       return duplicateDetection;
@@ -81,12 +81,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    public RoutingContextImpl(final Transaction transaction) {
-      this(transaction, null);
-   }
-
-   public RoutingContextImpl(final Transaction transaction, Executor executor) {
       this.transaction = transaction;
-      this.executor = executor;
    }
 
    @Override
@@ -121,7 +116,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public RoutingContext setReusable(boolean reusable) {
+   public RoutingContextImpl setReusable(boolean reusable) {
       if (this.reusable != null && !this.reusable.booleanValue()) {
          // cannot set to Reusable once it was set to false
          return this;
@@ -131,7 +126,7 @@ public class RoutingContextImpl implements RoutingContext {
       return this;
    }
    @Override
-   public RoutingContext setReusable(boolean reusable, int previousBindings) {
+   public RoutingContextImpl setReusable(boolean reusable, int previousBindings) {
       this.version = previousBindings;
       this.previousAddress = address;
       this.previousRoutingType = routingType;
@@ -144,7 +139,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public RoutingContext clear() {
+   public RoutingContextImpl clear() {
       map.clear();
 
       queueCount = 0;
@@ -252,7 +247,7 @@ public class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
-   public RoutingContext setRoutingType(RoutingType routingType) {
+   public RoutingContextImpl setRoutingType(RoutingType routingType) {
       if (this.routingType == null && routingType != null || this.routingType != routingType) {
          this.clear();
       }
@@ -313,6 +308,17 @@ public class RoutingContextImpl implements RoutingContext {
       return getContextListing(address).getDurableQueues();
    }
 
+   @Override
+   public RoutingContextImpl setServerSession(ServerSession session) {
+      this.serverSession = session;
+      return this;
+   }
+
+   @Override
+   public ServerSession getServerSession() {
+      return serverSession;
+   }
+
    @Override
    public int getQueueCount() {
       return queueCount;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 67201d3c7e..ab77a89f61 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.json.JsonArrayBuilder;
 import org.apache.activemq.artemis.json.JsonObjectBuilder;
 import java.security.cert.X509Certificate;
@@ -169,7 +172,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    private final SimpleString managementAddress;
 
-   protected final RoutingContext routingContext = new RoutingContextImpl(null);
+   protected final RoutingContext routingContext = new RoutingContextImpl(null).setServerSession(this);
 
    protected final SessionCallback callback;
 
@@ -1737,6 +1740,55 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       return tx;
    }
 
+
+   @Override
+   public boolean checkAutoCreate(SimpleString address, RoutingType routingType) throws Exception {
+      boolean result;
+      SimpleString unPrefixedAddress = removePrefix(address);
+      AddressSettings addressSettings =  server.getAddressSettingsRepository().getMatch(unPrefixedAddress.toString());
+
+      if (routingType == RoutingType.MULTICAST) {
+         if (server.getAddressInfo(unPrefixedAddress) == null) {
+            if (addressSettings.isAutoCreateAddresses()) {
+               try {
+                  createAddress(address, routingType, true);
+               } catch (ActiveMQAddressExistsException e) {
+                  // The address may have been created by another thread in the mean time.  Catch and do nothing.
+               }
+               result = true;
+            } else {
+               result = false;
+            }
+         } else {
+            result = true;
+         }
+      } else if (routingType == RoutingType.ANYCAST) {
+         if (server.locateQueue(unPrefixedAddress) == null) {
+            Bindings bindings = server.getPostOffice().lookupBindingsForAddress(address);
+            if (bindings != null) {
+               // this means the address has another queue with a different name, which is fine, we just ignore it on this case
+               result = true;
+            } else if (addressSettings.isAutoCreateQueues()) {
+               try {
+                  createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
+               } catch (ActiveMQQueueExistsException e) {
+                  // The queue may have been created by another thread in the mean time.  Catch and do nothing.
+               }
+               result = true;
+            } else {
+               result = false;
+            }
+         } else {
+            result = true;
+         }
+      } else {
+         result = true;
+      }
+
+      return result;
+   }
+
+
    @Override
    public RoutingStatus send(final Message message, final boolean direct) throws Exception {
       return send(message, direct, false);
@@ -2218,6 +2270,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          result = postOffice.route(msg, routingContext, direct);
 
+         logger.debug("Routing result for {} = {}", msg, result);
+
          Pair<Object, AtomicLong> value = targetAddressInfos.get(msg.getAddressSimpleString());
 
          if (value == null) {
@@ -2231,6 +2285,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             routingContext.clear();
          }
       }
+
       return result;
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
index f98ea8067c..71f397aad5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpExpiredMessageTest.java
@@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.lang.invoke.MethodHandles;
 import java.util.HashMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -49,9 +50,13 @@ import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
 
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    @Test(timeout = 60000)
    public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
       AmqpClient client = createAmqpClient();
@@ -568,7 +573,11 @@ public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
          message.setText("Test-Message");
          message.setDeliveryAnnotation("shouldDisappear", 1);
          message.setMessageAnnotation("x-opt-routing-type", (byte) 1);
+
+         logger.debug("*******************************************************************************************************************************");
+         logger.debug("message being sent {}", message);
          sender.send(message);
+         logger.debug("*******************************************************************************************************************************");
 
          Queue forward = getProxyToQueue(FORWARDING_ADDRESS);
          assertTrue("Message not diverted", Wait.waitFor(() -> forward.getMessageCount() > 0, 7000, 500));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java
new file mode 100644
index 0000000000..dece936305
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/DeleteAddressTest.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeleteAddressTest extends ActiveMQTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   ActiveMQServer server;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+   }
+
+   private void localServer(boolean autoCreate) throws Exception {
+      server = createServer(false, true);
+
+      AddressSettings settings = new AddressSettings().setAutoDeleteAddresses(autoCreate).setAutoCreateAddresses(autoCreate).setAutoCreateQueues(autoCreate).setAutoDeleteQueues(autoCreate).setDeadLetterAddress(SimpleString.toSimpleString("DLQ")).setSendToDLAOnNoRoute(true);
+      server.start();
+      server.createQueue(new QueueConfiguration("DLQ").setRoutingType(RoutingType.ANYCAST));
+      server.getAddressSettingsRepository().addMatch(getName() + "*", settings);
+   }
+
+   @Test
+   public void testQueueNoAutoCreateCore() throws Exception {
+      internalQueueTest("CORE", false);
+   }
+
+   @Test
+   public void testQueueNoAutoCreateAMQP() throws Exception {
+      internalQueueTest("AMQP", false);
+   }
+
+   @Test
+   public void testQueueNoAutoCreateOpenWire() throws Exception {
+      internalQueueTest("OPENWIRE", false);
+   }
+
+
+   @Test
+   public void testQueueAutoCreateCore() throws Exception {
+      internalQueueTest("CORE", true);
+   }
+
+   @Test
+   public void testDeletoAutoCreateAMQP() throws Exception {
+      internalQueueTest("AMQP", true);
+   }
+
+   @Test
+   public void testQueueAutoCreateOpenWire() throws Exception {
+      internalQueueTest("OPENWIRE", true);
+   }
+
+   public void internalQueueTest(String protocol, boolean autocreate) throws Exception {
+      localServer(autocreate);
+
+      String ADDRESS_NAME = getName() + protocol;
+
+      if (!autocreate) {
+         server.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new QueueConfiguration(ADDRESS_NAME).setRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      }
+
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+         Queue queue = session.createQueue(ADDRESS_NAME);
+         MessageProducer producer = session.createProducer(queue);
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+         connection.start();
+
+         try (MessageConsumer consumer = session.createConsumer(queue)) {
+            logger.debug("Sending hello message");
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("hello", message.getText());
+         }
+
+         session.commit();
+
+         org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(ADDRESS_NAME);
+         Wait.assertEquals(0, serverQueue::getConsumerCount);
+
+         server.destroyQueue(SimpleString.toSimpleString(ADDRESS_NAME));
+
+         boolean exception = false;
+         try {
+            logger.debug("Sending good bye message");
+            producer.send(session.createTextMessage("good bye"));
+            session.commit();
+            logger.debug("Exception was not captured, sent went fine");
+         } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+            exception = true;
+         }
+
+         if (!autocreate) {
+            Assert.assertTrue(exception);
+         }
+
+         if (autocreate) {
+            logger.debug("creating consumer");
+            try (MessageConsumer consumer = session.createConsumer(queue)) {
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               Assert.assertNotNull(message);
+               Assert.assertEquals("good bye", message.getText());
+            }
+         } else {
+            exception = false;
+            logger.debug("Creating consumer, where an exception is expected");
+            try (MessageConsumer consumer = session.createConsumer(queue)) {
+            } catch (Exception e) {
+               logger.debug("Received exception after createConsumer");
+               exception = true;
+            }
+            Assert.assertTrue(exception);
+         }
+      }
+
+      org.apache.activemq.artemis.core.server.Queue dlqServerQueue = server.locateQueue("DLQ");
+      Assert.assertEquals(0, dlqServerQueue.getMessageCount());
+   }
+
+   @Test
+   public void testTopicNoAutoCreateCore() throws Exception {
+      internalMulticastTest("CORE", false);
+   }
+
+   @Test
+   public void testTopicAutoCreateCore() throws Exception {
+      internalMulticastTest("CORE", true);
+   }
+
+   @Test
+   public void testTopicNoAutoCreateAMQP() throws Exception {
+      internalMulticastTest("AMQP", false);
+   }
+
+   @Test
+   public void testTopicAutoCreateAMQP() throws Exception {
+      internalMulticastTest("AMQP", true);
+   }
+
+   @Test
+   public void testTopicNoAutoCreateOPENWIRE() throws Exception {
+      internalMulticastTest("OPENWIRE", false);
+   }
+
+   @Test
+   public void testTopicAutoCreateOPENWIRE() throws Exception {
+      internalMulticastTest("OPENWIRE", true);
+   }
+
+   public void internalMulticastTest(String protocol, boolean autocreate) throws Exception {
+      localServer(autocreate);
+
+      String ADDRESS_NAME = getName() + protocol + "_Topic";
+      final String dlqText = "This should be in DLQ " + RandomUtil.randomString();
+
+      if (!autocreate) {
+         server.addAddressInfo(new AddressInfo(ADDRESS_NAME).addRoutingType(RoutingType.MULTICAST));
+      }
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("client");
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Topic destination = session.createTopic(ADDRESS_NAME);
+
+         TopicSubscriber consumer = session.createDurableSubscriber(destination, "subs1");
+
+         MessageProducer producer = session.createProducer(destination);
+         producer.send(session.createTextMessage("hello"));
+         session.commit();
+         connection.start();
+
+         logger.debug("Sending hello message");
+         TextMessage message = (TextMessage) consumer.receive(5000);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("hello", message.getText());
+
+         consumer.close();
+
+         session.commit();
+
+         Bindings bindings = server.getPostOffice().lookupBindingsForAddress(SimpleString.toSimpleString(ADDRESS_NAME));
+         for (Binding b : bindings.getBindings()) {
+            if (b instanceof LocalQueueBinding) {
+               Wait.assertEquals(0, () -> ((LocalQueueBinding)b).getQueue().getConsumerCount());
+               server.destroyQueue(b.getUniqueName());
+            }
+         }
+
+         producer.send(session.createTextMessage(dlqText));
+         session.commit();
+
+         server.removeAddressInfo(SimpleString.toSimpleString(ADDRESS_NAME), null);
+
+         try {
+            logger.debug("Sending good bye message");
+            producer.send(session.createTextMessage("good bye"));
+            logger.debug("Exception was not captured, sent went fine");
+            if (!autocreate) {
+               session.commit();
+               Assert.fail("Exception was expected");
+            } else {
+               session.rollback();
+            }
+         } catch (Exception e) {
+            logger.debug(e.getMessage(), e);
+         }
+
+         logger.debug("creating consumer");
+         try (TopicSubscriber newSubs = session.createDurableSubscriber(destination, "second")) {
+            if (!autocreate) {
+               Assert.fail("exception was expected");
+            }
+         } catch (Exception expected) {
+            logger.debug(expected.getMessage(), expected);
+         }
+
+         org.apache.activemq.artemis.core.server.Queue dlqServerQueue = server.locateQueue("DLQ");
+         Assert.assertEquals(1, dlqServerQueue.getMessageCount());
+      }
+
+      try (Connection connection = factory.createConnection()) {
+         connection.setClientID("client");
+         connection.start();
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+         MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("DLQ"));
+         TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
+         Assert.assertNotNull(dlqMessage);
+         Assert.assertEquals(dlqText, dlqMessage.getText());
+         Assert.assertNull(dlqConsumer.receiveNoWait());
+      }
+
+
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index b1e96713c5..5914d62b3a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -272,6 +272,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
       Message clientFile = createLargeClientMessageStreaming(session, messageSize, true);
 
+      logger.debug("****** Send message");
       producer.send(clientFile);
 
       session.commit();
@@ -292,7 +293,7 @@ public class LargeMessageTest extends LargeMessageTestBase {
          msg1.getBodyBuffer().readByte();
          Assert.fail("Exception was expected");
       } catch (final Exception ignored) {
-         // empty on purpose
+         logger.debug(ignored.getMessage(), ignored);
       }
 
       session.close();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
index 222061123b..576d5499ba 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java
@@ -636,6 +636,7 @@ public class AddressControlTest extends ManagementTestBase {
       session.createAddress(address, RoutingType.ANYCAST, false);
 
       AddressControl addressControl = createManagementControl(address);
+      Assert.assertNotNull(addressControl);
       assertEquals(0, addressControl.getMessageCount());
 
       ClientProducer producer = session.createProducer(address.toString());


[activemq-artemis] 04/13: ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 60c5b9871e14d75e8a535640d4824c365a8dfefa
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Nov 10 07:51:30 2022 -0500

    ARTEMIS-4083 ClientLargeMessage Streaming not closing inputStream if compressed
    
    (cherry picked from commit f2e0f8713fb984c0c9b222a1e44470cff4606cf6)
---
 .../core/client/impl/ClientProducerImpl.java       |  3 +-
 .../tests/integration/client/LargeMessageTest.java | 46 ++++++++++++++++------
 2 files changed, 36 insertions(+), 13 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
index bdb3a02223..25e5f7a372 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java
@@ -459,7 +459,8 @@ public class ClientProducerImpl implements ClientProducerInternal {
                }
 
                pos += numberOfBytesRead;
-            } while (pos < minLargeMessageSize);
+            }
+            while (pos < minLargeMessageSize);
 
             totalSize += pos;
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
index 2e00436088..b1e96713c5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/LargeMessageTest.java
@@ -2795,27 +2795,36 @@ public class LargeMessageTest extends LargeMessageTestBase {
    }
 
    @Test
-   public void testStream() throws Exception {
+   public void testStreamedMessage() throws Exception {
+      testStream(false);
+   }
+
+   @Test
+   public void testStreamedMessageCompressed() throws Exception {
+      testStream(true);
+   }
+
+   private void testStream(boolean compressed) throws Exception {
       ActiveMQServer server = createServer(true, isNetty(), storeType);
 
       server.start();
 
-      locator.setCompressLargeMessage(true);
+      locator.setCompressLargeMessage(compressed);
 
       ClientSessionFactory sf = addSessionFactory(createSessionFactory(locator));
 
       ClientSession session = sf.createSession(false, false);
 
-      final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
-
-      server.createQueue(new QueueConfiguration(MY_QUEUE).setRoutingType(RoutingType.ANYCAST));
+      session.createQueue(new QueueConfiguration(ADDRESS));
 
-      ClientProducer producer = session.createProducer(MY_QUEUE);
+      ClientProducer producer = session.createProducer(ADDRESS);
 
       AtomicBoolean closed = new AtomicBoolean(false);
 
+      final int BYTES = 1_000;
+
       InputStream inputStream = new InputStream() {
-         int bytes = 10_000;
+         int bytes = BYTES;
          @Override
          public int read() throws IOException {
             if (bytes-- > 0) {
@@ -2830,12 +2839,10 @@ public class LargeMessageTest extends LargeMessageTestBase {
             closed.set(true);
          }
 
-
          @Override
-         public int available () throws IOException {
+         public int available() {
             return bytes;
          }
-
       };
 
       ClientMessage message = session.createMessage(true);
@@ -2844,11 +2851,26 @@ public class LargeMessageTest extends LargeMessageTestBase {
 
       Wait.assertTrue(closed::get);
 
-      session.close();
+      session.commit();
 
-   }
+      session.start();
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
 
+      ClientMessage receivedMessage = consumer.receive(5000);
+      Assert.assertNotNull(receivedMessage);
 
+      ActiveMQBuffer buffer = receivedMessage.getBodyBuffer();
+      Assert.assertEquals(BYTES, buffer.readableBytes());
 
+      for (int i = 0; i < BYTES; i++) {
+         Assert.assertEquals((byte)10, buffer.readByte());
+      }
+
+      Assert.assertEquals(0, buffer.readableBytes());
+
+      session.close();
+
+   }
 
 }
\ No newline at end of file


[activemq-artemis] 08/13: ARTEMIS-4078 Fix divert reloading

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 64918caa100cfb00b23e55ada5b3eb30a3f6c677
Author: Å mucr Jan <ja...@aimtec.cz>
AuthorDate: Tue Nov 1 14:00:18 2022 +0100

    ARTEMIS-4078 Fix divert reloading
    
    Reloading has been fixed for divert:
    * filter
    * address
    * exclusive
    
    Source address and exclusivity changes require divert redeployment.
    
    (cherry picked from commit 43824fc494e6ed67a16a5512865362f74bacc953)
---
 .../core/server/impl/ActiveMQServerImpl.java       |  31 ++-
 .../tests/integration/jms/RedeployTest.java        | 298 +++++++++++++++++----
 .../resources/reload-divert-address-source1.xml    |  64 +++++
 .../resources/reload-divert-address-source2.xml    |  64 +++++
 .../resources/reload-divert-address-target1.xml    |  64 +++++
 .../resources/reload-divert-address-target2.xml    |  64 +++++
 .../src/test/resources/reload-divert-exclusive.xml |  63 +++++
 .../test/resources/reload-divert-filter-none.xml   |  63 +++++
 .../test/resources/reload-divert-filter-x-eq-x.xml |  64 +++++
 .../test/resources/reload-divert-filter-x-eq-y.xml |  64 +++++
 .../test/resources/reload-divert-non-exclusive.xml |  63 +++++
 11 files changed, 845 insertions(+), 57 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index a0e499ca13..af278ebfd9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2867,8 +2867,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       final Divert divert = divertBinding.getDivert();
 
       Filter filter = FilterImpl.createFilter(config.getFilterString());
-      if (filter != null && !filter.equals(divert.getFilter())) {
-         divert.setFilter(filter);
+      if (filter == null) {
+         divert.setFilter(null);
+      } else {
+         if (!filter.equals(divert.getFilter())) {
+            divert.setFilter(filter);
+         }
       }
 
       if (config.getTransformerConfiguration() != null) {
@@ -2880,8 +2884,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       if (config.getForwardingAddress() != null) {
          SimpleString forwardAddress = SimpleString.toSimpleString(config.getForwardingAddress());
-
-         if (!forwardAddress.equals(config)) {
+         if (!forwardAddress.equals(divert.getForwardAddress())) {
             divert.setForwardAddress(forwardAddress);
          }
       }
@@ -4445,16 +4448,34 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          recoverStoredAddressSettings();
 
          ActiveMQServerLogger.LOGGER.reloadingConfiguration("diverts");
+         // Filter out all active diverts
          final Set<SimpleString> divertsToRemove = postOffice.getAllBindings()
                  .filter(binding -> binding instanceof DivertBinding)
                  .map(Binding::getUniqueName)
                  .collect(Collectors.toSet());
+         // Go through the currently configured diverts
          for (DivertConfiguration divertConfig : configuration.getDivertConfigurations()) {
+            // Retain diverts still configured to exist
             divertsToRemove.remove(SimpleString.toSimpleString(divertConfig.getName()));
-            if (postOffice.getBinding(new SimpleString(divertConfig.getName())) == null) {
+            // Deploy newly added diverts, reconfigure existing
+            final SimpleString divertName = new SimpleString(divertConfig.getName());
+            final DivertBinding divertBinding = (DivertBinding) postOffice.getBinding(divertName);
+            if (divertBinding == null) {
                deployDivert(divertConfig);
+            } else {
+               if ((divertBinding.isExclusive() != divertConfig.isExclusive()) ||
+                       !divertBinding.getAddress().toString().equals(divertConfig.getAddress())) {
+                  // Diverts whose exclusivity or address has changed have to be redeployed.
+                  // See the Divert interface and look for setters. Absent setter is a hint that maybe that property is immutable.
+                  destroyDivert(divertName);
+                  deployDivert(divertConfig);
+               } else {
+                  // Diverts with their exclusivity and address unchanged can be updated directly.
+                  updateDivert(divertConfig);
+               }
             }
          }
+         // Remove all remaining diverts
          for (final SimpleString divertName : divertsToRemove) {
             try {
                destroyDivert(divertName);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index c893d7514d..035772364a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.activemq.artemis.tests.integration.jms;
 
+import java.io.InputStream;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -51,6 +53,7 @@ import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancing
 import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
 import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.server.reload.ReloadManager;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
@@ -566,31 +569,28 @@ public class RedeployTest extends ActiveMQTestBase {
       }
    }
 
-   private void deployBrokerConfig(EmbeddedActiveMQ server, URL configFile) throws Exception {
-
+   private void deployBrokerConfig(EmbeddedActiveMQ server, String configFileName) throws Exception {
       Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
-      Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
-      brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
-      final ReusableLatch latch = new ReusableLatch(1);
-      Runnable tick = latch::countDown;
-      server.getActiveMQServer().getReloadManager().setTick(tick);
-
-      latch.await(10, TimeUnit.SECONDS);
+      final ReloadManager reloadManager = server.getActiveMQServer().getReloadManager();
+      final boolean reloadManagerOriginallyStarted = reloadManager.isStarted();
+      try {
+         reloadManager.stop();
+         final URL configFile = RedeployTest.class.getClassLoader().getResource(configFileName);
+         assertNotNull(configFile);
+         try (InputStream configStream = configFile.openStream()) {
+            Files.copy(configStream, brokerXML, StandardCopyOption.REPLACE_EXISTING);
+         }
+         server.getActiveMQServer().reloadConfigurationFile();
+      } finally {
+         if (reloadManagerOriginallyStarted) {
+            reloadManager.start();
+         }
+      }
    }
 
-   private void doTestRemoveFilter(URL testConfiguration) throws Exception {
-
-      Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
-
-      URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
-
-      Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
+   private void doTestQueueRemoveFilter(String testConfigurationFileName) throws Exception {
 
-      EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
-      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
-      embeddedActiveMQ.start();
-
-      deployBrokerConfig(embeddedActiveMQ, baseConfig);
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-queue-filter.xml");
 
       try {
 
@@ -628,7 +628,7 @@ public class RedeployTest extends ActiveMQTestBase {
             consumer.close();
          }
 
-         deployBrokerConfig(embeddedActiveMQ, testConfiguration);
+         deployBrokerConfig(embeddedActiveMQ, testConfigurationFileName);
 
          try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
               Connection connection = factory.createConnection();
@@ -666,26 +666,20 @@ public class RedeployTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testRedeployRemoveFilter() throws Exception {
-      doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated-empty.xml"));
-      doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
+   public void testRedeployRemoveQueueFilter() throws Exception {
+      doTestQueueRemoveFilter("reload-queue-filter-updated-empty.xml");
+      doTestQueueRemoveFilter("reload-queue-filter-removed.xml");
    }
 
    /**
     * This one is here just to make sure it's possible to change queue parameters one by one without setting the others
     * to <code>null</code>.
-    * @throws Exception
+    * @throws Exception An exception.
     */
    @Test
    public void testQueuePartialReconfiguration() throws Exception {
 
-      Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
-      URL url = RedeployTest.class.getClassLoader().getResource("reload-empty.xml");
-      Files.copy(url.openStream(), brokerXML);
-
-      EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
-      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
-      embeddedActiveMQ.start();
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-empty.xml");
 
       try {
 
@@ -708,13 +702,7 @@ public class RedeployTest extends ActiveMQTestBase {
    @Test
    public void testRedeployQueueDefaults() throws Exception {
 
-      Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
-      URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-before.xml");
-      URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-after.xml");
-      Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
-      EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
-      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
-      embeddedActiveMQ.start();
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-queue-defaults-before.xml");
 
       try {
          LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
@@ -736,7 +724,7 @@ public class RedeployTest extends ActiveMQTestBase {
          assertEquals(new SimpleString("jdoe"), queue.getUser());
          assertNotEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
 
-         deployBrokerConfig(embeddedActiveMQ, newConfig);
+         deployBrokerConfig(embeddedActiveMQ, "reload-queue-defaults-after.xml");
 
          assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
          assertEquals(RoutingType.MULTICAST, queue.getRoutingType());
@@ -761,21 +749,15 @@ public class RedeployTest extends ActiveMQTestBase {
    @Test
    public void testUndeployDivert() throws Exception {
 
-      Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
-      URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-before.xml");
-      URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-after.xml");
-      Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
-      EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
-      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
-      embeddedActiveMQ.start();
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-undeploy-before.xml");
 
       try {
          DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
                  .getBinding(new SimpleString("divert"));
          assertNotNull(divertBinding);
 
-         Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
-         Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
+         final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
+         final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
 
          try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
               Connection connection = factory.createConnection();
@@ -785,13 +767,13 @@ public class RedeployTest extends ActiveMQTestBase {
               MessageConsumer targetConsumer = session.createConsumer(targetQueue)) {
 
             connection.start();
-            Message message = session.createTextMessage("Hello world");
+            final Message message = session.createTextMessage("Hello world");
             sourceProducer.send(message);
             assertNotNull(sourceConsumer.receive(2000));
             assertNotNull(targetConsumer.receive(2000));
          }
 
-         deployBrokerConfig(embeddedActiveMQ, newConfig);
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-undeploy-after.xml");
 
          Wait.waitFor(() -> embeddedActiveMQ.getActiveMQServer().getPostOffice()
                          .getBinding(new SimpleString("divert")) == null);
@@ -817,6 +799,218 @@ public class RedeployTest extends ActiveMQTestBase {
       }
    }
 
+   private void sendDivertedTestMessage(Queue queue, Queue forwardingQueue, boolean shouldReceiveFromQueue, boolean shouldReceiveFromForwardingQueue, Map<String, String> properties) throws JMSException {
+      try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+           Connection connection = factory.createConnection();
+           Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
+           MessageProducer queueProducer = session.createProducer(queue);
+           MessageConsumer queueConsumer = session.createConsumer(queue);
+           MessageConsumer forwardingQueueConsumer = session.createConsumer(forwardingQueue)) {
+
+         connection.start();
+         final Message message = session.createTextMessage("Hello world");
+         for (Map.Entry<String, String> entry : properties.entrySet()) {
+            message.setStringProperty(entry.getKey(), entry.getValue());
+         }
+         queueProducer.send(message);
+
+         final Message queueMessage = queueConsumer.receive(2000);
+         final Message forwardingQueueMessage = forwardingQueueConsumer.receive(2000);
+         if (shouldReceiveFromQueue) {
+            assertNotNull("A message should have been received from the '" + queue.getQueueName() + "' queue.", queueMessage);
+         } else {
+            assertNull("No message should have been received from the '" + queue.getQueueName() + "' queue.", queueMessage);
+         }
+         if (shouldReceiveFromForwardingQueue) {
+            assertNotNull("A message should have been received from the '" + forwardingQueue.getQueueName() + "' forwarding queue.", forwardingQueueMessage);
+         } else {
+            assertNull("No message should have been received from the '" + forwardingQueue.getQueueName() + "' forwarding queue.", forwardingQueueMessage);
+         }
+      }
+   }
+
+   private EmbeddedActiveMQ createEmbeddedActiveMQServer(String initialConfigFileName) throws Exception {
+      final Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+      final URL baseConfig = RedeployTest.class.getClassLoader().getResource(initialConfigFileName);
+      assertNotNull(baseConfig);
+      try (InputStream configStream = baseConfig.openStream()) {
+         Files.copy(configStream, brokerXML, StandardCopyOption.REPLACE_EXISTING);
+      }
+      final EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+      embeddedActiveMQ.start();
+      waitForServerToStart(embeddedActiveMQ.getActiveMQServer());
+      return embeddedActiveMQ;
+   }
+
+   @Test
+   public void testAddDivertFilter() throws Exception {
+
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-filter-none.xml");
+
+      final SimpleString divertName = new SimpleString("source-to-target");
+      final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
+      final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
+      final Map<String, String> emptyTestMessageProperties = Map.of();
+      final Map<String, String> testMessagePropertiesXX = Map.of("x", "x");
+
+      try {
+         DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertNull("The divert '" + divertName + "' should have no filter applied at first.", divertBinding.getFilter());
+
+         assertNull(embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(new SimpleString("foo")));
+
+         // Message with no properties should be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, emptyTestMessageProperties);
+         // Message with properties should be diverted too.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX);
+
+         // Add filter
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-filter-x-eq-x.xml");
+
+         divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertNotNull("The divert '" + divertName + "' should have a filter applied after the new configuration is loaded.", divertBinding.getFilter());
+
+         // Message with no properties SHOULD NOT be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, true, false, emptyTestMessageProperties);
+         // Message with property x == "x" SHOULD be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX);
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
+   @Test
+   public void testRemoveDivertFilter() throws Exception {
+
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-filter-x-eq-x.xml");
+      final SimpleString divertName = new SimpleString("source-to-target");
+      final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
+      final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
+      final Map<String, String> emptyTestMessageProperties = Map.of();
+      final Map<String, String> testMessagePropertiesXX = Map.of("x", "x");
+
+      try {
+         DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertNotNull("The divert '" + divertName + "' should have a filter applied at first.", divertBinding.getFilter());
+
+         // Message with no properties SHOULD NOT be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, true, false, emptyTestMessageProperties);
+         // Message with property x == "x" SHOULD be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX);
+
+         // Remove filter
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-filter-none.xml");
+
+         divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertNull("The divert '" + divertName + "' should not have a filter applied after the new configuration is loaded.", divertBinding.getFilter());
+
+         // Message with no properties should be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, emptyTestMessageProperties);
+         // Message with properties should be diverted too.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX);
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
+   @Test
+   public void testChangeDivertFilter() throws Exception {
+
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-filter-x-eq-x.xml");
+      final SimpleString divertName = new SimpleString("source-to-target");
+      final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
+      final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
+      final Map<String, String> testMessagePropertiesXX = Map.of("x", "x");
+      final Map<String, String> testMessagePropertiesXY = Map.of("x", "y");
+
+      try {
+         DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertNotNull("The divert '" + divertName + "' should have a filter applied after the first configuration file is loaded.", divertBinding.getFilter());
+
+         // Message with property x == "x" SHOULD be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXX);
+         // Message with property x == "y" SHOULD NOT be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, true, false, testMessagePropertiesXY);
+
+         // Update filter
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-filter-x-eq-y.xml");
+
+         divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertNotNull("The divert '" + divertName + "' should have a filter applied after the second configuration file is loaded.", divertBinding.getFilter());
+
+         // Message with property x == "x" SHOULD NOT be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, true, false, testMessagePropertiesXX);
+         // Message with property x == "y" SHOULD be diverted.
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, testMessagePropertiesXY);
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
+   @Test
+   public void testChangeDivertExclusivity() throws Exception {
+
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-exclusive.xml");
+      final SimpleString divertName = new SimpleString("source-to-target");
+      final Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
+      final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
+
+      try {
+         DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+
+         // Message should be routed to the forwarding queue only
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, Map.of());
+
+         // Route to both queues
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-non-exclusive.xml");
+         sendDivertedTestMessage(sourceQueue, targetQueue, true, true, Map.of());
+
+         // Route to the forwarding queue only
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-exclusive.xml");
+         sendDivertedTestMessage(sourceQueue, targetQueue, false, true, Map.of());
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
+   @Test
+   public void testChangeDivertAddress() throws Exception {
+
+      final EmbeddedActiveMQ embeddedActiveMQ = createEmbeddedActiveMQServer("reload-divert-address-source1.xml");
+      final SimpleString divertName = new SimpleString("source-to-target");
+      final Queue sourceQueue1 = (Queue) ActiveMQDestination.createDestination("queue://source1", ActiveMQDestination.TYPE.QUEUE);
+      final Queue sourceQueue2 = (Queue) ActiveMQDestination.createDestination("queue://source2", ActiveMQDestination.TYPE.QUEUE);
+      final Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
+
+      try {
+         DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertEquals("Divert '" + divertName + "' address should be '" + sourceQueue1.getQueueName() + "'.", sourceQueue1.getQueueName(), divertBinding.getAddress().toString());
+         sendDivertedTestMessage(sourceQueue1, targetQueue, false, true, Map.of());
+         sendDivertedTestMessage(sourceQueue2, targetQueue, true, false, Map.of());
+
+         deployBrokerConfig(embeddedActiveMQ, "reload-divert-address-source2.xml");
+
+         divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(divertName);
+         assertNotNull("Divert '" + divertName + "' binding should exist.", divertBinding);
+         assertEquals("Divert '" + divertName + "' address should have been updated to '" + sourceQueue2.getQueueName() + "'.", sourceQueue2.getQueueName(), divertBinding.getAddress().toString());
+         sendDivertedTestMessage(sourceQueue1, targetQueue, true, false, Map.of());
+         sendDivertedTestMessage(sourceQueue2, targetQueue, false, true, Map.of());
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
+   // TODO: Test divert transformers: add, change, remove
+
    @Test
    public void testRedeployWithFailover() throws Exception {
       Set<Role> original = new HashSet<>();
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml
new file mode 100644
index 0000000000..a055523c80
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-address-source1.xml
@@ -0,0 +1,64 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source1"/>
+         <address name="source2"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source1</address>
+            <forwarding-address>target</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml
new file mode 100644
index 0000000000..fbb08005a0
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-address-source2.xml
@@ -0,0 +1,64 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source1"/>
+         <address name="source2"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source2</address>
+            <forwarding-address>target</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml
new file mode 100644
index 0000000000..220fee04eb
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-address-target1.xml
@@ -0,0 +1,64 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target1"/>
+         <address name="target2"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target1</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml
new file mode 100644
index 0000000000..1b3bf2c858
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-address-target2.xml
@@ -0,0 +1,64 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target1"/>
+         <address name="target2"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target2</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml
new file mode 100644
index 0000000000..c8a0054be2
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-exclusive.xml
@@ -0,0 +1,63 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml
new file mode 100644
index 0000000000..c8a0054be2
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-filter-none.xml
@@ -0,0 +1,63 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target</forwarding-address>
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml
new file mode 100644
index 0000000000..e748824490
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-x.xml
@@ -0,0 +1,64 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target</forwarding-address>
+            <filter string="x = 'x'" />
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml
new file mode 100644
index 0000000000..78519664f2
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-filter-x-eq-y.xml
@@ -0,0 +1,64 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target</forwarding-address>
+            <filter string="x = 'y'" />
+            <exclusive>true</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>
diff --git a/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml
new file mode 100644
index 0000000000..df5c5a9836
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-divert-non-exclusive.xml
@@ -0,0 +1,63 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+
+      <name>0.0.0.0</name>
+
+      <configuration-file-refresh-period>100</configuration-file-refresh-period>
+
+      <persistence-enabled>false</persistence-enabled>
+
+      <security-enabled>false</security-enabled>
+
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./target/data/paging</paging-directory>
+
+      <bindings-directory>./target/data/bindings</bindings-directory>
+
+      <journal-directory>./target/data/journal</journal-directory>
+
+      <large-messages-directory>./target/data/large-messages</large-messages-directory>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="source"/>
+         <address name="target"/>
+      </addresses>
+
+      <diverts>
+         <divert name="source-to-target">
+            <address>source</address>
+            <forwarding-address>target</forwarding-address>
+            <exclusive>false</exclusive>
+         </divert>
+      </diverts>
+   </core>
+</configuration>


[activemq-artemis] 11/13: ARTEMIS-4092: resolve issues with upgrade backups, tweaks for clarity and consistency, additional output detail

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit ecbb0e16d3d52f94cbef9bb6803199e895cf8591
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Wed Nov 23 16:05:37 2022 +0000

    ARTEMIS-4092: resolve issues with upgrade backups, tweaks for clarity and consistency, additional output detail
    
    Adds test verifications that expected files are backed up and match pre-upgrade reference files.
    
    This closes #4291
    
    (cherry picked from commit 481d07f27e5c69b0b20c30ed7b24ca4a4b70d338)
---
 .../activemq/artemis/cli/commands/Create.java      |   6 +-
 .../activemq/artemis/cli/commands/Upgrade.java     | 147 ++++++++++++++-------
 tests/smoke-tests/pom.xml                          |  16 +++
 .../servers/windowsUpgrade/bin/artemis-service.exe |   1 +
 .../windowsUpgrade/bin/artemis-service.exe.config  |   6 +
 .../smoke/upgradeTest/CompareUpgradeTest.java      |  46 +++++++
 6 files changed, 170 insertions(+), 52 deletions(-)

diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index 0e703dec3f..122f0340d9 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -64,8 +64,10 @@ public class Create extends InstallAbstract {
 
    public static final String ARTEMIS_CMD = "artemis.cmd";
    public static final String BIN_ARTEMIS_CMD = "bin/" + ARTEMIS_CMD;
-   public static final String BIN_ARTEMIS_SERVICE_EXE = "bin/artemis-service.exe";
-   public static final String BIN_ARTEMIS_SERVICE_EXE_CONFIG = "bin/artemis-service.exe.config";
+   public static final String ARTEMIS_SERVICE_EXE = "artemis-service.exe";
+   public static final String BIN_ARTEMIS_SERVICE_EXE = "bin/" + ARTEMIS_SERVICE_EXE;
+   public static final String ARTEMIS_SERVICE_EXE_CONFIG = "artemis-service.exe.config";
+   public static final String BIN_ARTEMIS_SERVICE_EXE_CONFIG = "bin/" + ARTEMIS_SERVICE_EXE_CONFIG;
    public static final String ARTEMIS_SERVICE_XML = "artemis-service.xml";
    public static final String BIN_ARTEMIS_SERVICE_XML = "bin/" + ARTEMIS_SERVICE_XML;
    public static final String ETC_ARTEMIS_PROFILE_CMD = "artemis.profile.cmd";
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Upgrade.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Upgrade.java
index 052901802b..aac686ba0c 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Upgrade.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Upgrade.java
@@ -45,7 +45,7 @@ public class Upgrade extends InstallAbstract {
    // this is the prefix where we can find the JDK arguments in Linux script
    private static final String JDK_PREFIX_LINUX = "JAVA_ARGS=";
 
-   protected static final String OLD_LOGGING_PROPERTIES = "logging.properties";
+   public static final String OLD_LOGGING_PROPERTIES = "logging.properties";
 
    /**
     * Checks that the directory provided either exists and is writable or doesn't exist but can be created.
@@ -73,26 +73,28 @@ public class Upgrade extends InstallAbstract {
       context.out.println("*******************************************************************************************************************************");
       context.out.println("Upgrading broker instance " + directory + " to use artemis.home=" + getBrokerHome());
 
-      File bkpFolder = findBackup(context);
-      File binBkp = new File(bkpFolder, "bin");
-      File etcBkp = new File(bkpFolder, "etc");
-      File tmp = new File(bkpFolder, "tmp");
-      binBkp.mkdirs();
-      etcBkp.mkdirs();
-      tmp.mkdirs();
+      final File bkpFolder = findBackup(context);
 
-      File bin = new File(directory, "bin");
+      final File binBkp = new File(bkpFolder, "bin");
+      final File etcBkp = new File(bkpFolder, "etc");
+      final File tmp = new File(bkpFolder, "tmp");
+      Files.createDirectory(binBkp.toPath());
+      Files.createDirectory(etcBkp.toPath());
+      Files.createDirectory(tmp.toPath());
+
+      final File bin = new File(directory, "bin");
       File etcFolder = new File(directory, etc);
 
+      final File artemisCmdScript = new File(bin, Create.ARTEMIS_CMD);
+      final File artemisScript = new File(bin, Create.ARTEMIS);
+
       if (etc == null || etc.equals("etc")) {
          if (IS_WINDOWS && !IS_CYGWIN) {
-            File cmd = new File(bin, Create.ARTEMIS_CMD);
             String pattern = "set ARTEMIS_INSTANCE_ETC=";
-            etcFolder = getETC(context, etcFolder, cmd, pattern);
+            etcFolder = getETC(context, etcFolder, artemisCmdScript, pattern);
          } else {
-            File cmd = new File(bin, Create.ARTEMIS);
             String pattern = "ARTEMIS_INSTANCE_ETC=";
-            etcFolder = getETC(context, etcFolder, cmd, pattern);
+            etcFolder = getETC(context, etcFolder, artemisScript, pattern);
          }
       }
 
@@ -108,43 +110,81 @@ public class Upgrade extends InstallAbstract {
       Create.addScriptFilters(filters, getHome(), getInstance(), etcFolder, new File(getInstance(), "notUsed"), new File(getInstance(), "om-not-used.dmp"), javaMemory, javaOptions, "NA");
 
       if (IS_WINDOWS) {
-         // recreating the service.exe in case we ever upgrade it
+         // recreating the service.exe and config in case we ever upgrade it
+         final File serviceExe = new File(directory, Create.BIN_ARTEMIS_SERVICE_EXE);
+         final File serviceExeBkp = new File(bkpFolder, Create.BIN_ARTEMIS_SERVICE_EXE);
+
+         context.out.println("Copying " + serviceExe + " to " + serviceExeBkp);
+         Files.copy(serviceExe.toPath(), serviceExeBkp.toPath(), StandardCopyOption.REPLACE_EXISTING);
+
+         context.out.println("Updating " + serviceExe.toPath());
          write(Create.BIN_ARTEMIS_SERVICE_EXE, true);
+
+         final File serviceExeConfig = new File(directory, Create.BIN_ARTEMIS_SERVICE_EXE_CONFIG);
+         final File serviceExeConfigBkp = new File(bkpFolder, Create.BIN_ARTEMIS_SERVICE_EXE_CONFIG);
+         if (serviceExeConfig.exists()) {
+            // It didnt exist until more recently
+            context.out.println("Copying " + serviceExeConfig + " to " + serviceExeConfigBkp);
+            Files.copy(serviceExeConfig.toPath(), serviceExeConfigBkp.toPath(), StandardCopyOption.REPLACE_EXISTING);
+         }
+
+         context.out.println("Updating " + serviceExeConfig);
          write(Create.BIN_ARTEMIS_SERVICE_EXE_CONFIG, true);
 
-         write(Create.BIN_ARTEMIS_CMD, new File(tmp, Create.ARTEMIS_CMD), filters, false, false);
-         upgrade(new File(tmp, Create.ARTEMIS_CMD), new File(bin, Create.ARTEMIS_CMD), binBkp, "set ARTEMIS_INSTANCE_ETC=");
+         final File artemisCmdScriptTmp = new File(tmp, Create.ARTEMIS_CMD);
+         final File artemisCmdScriptBkp = new File(binBkp, Create.ARTEMIS_CMD);
 
-         write(Create.BIN_ARTEMIS_SERVICE_XML, new File(tmp, Create.ARTEMIS_SERVICE_XML), filters, false, false);
-         upgrade(new File(tmp, Create.ARTEMIS_SERVICE_XML), new File(bin, Create.ARTEMIS_SERVICE_XML), binBkp,
+         write(Create.BIN_ARTEMIS_CMD, artemisCmdScriptTmp, filters, false, false);
+         upgrade(context, artemisCmdScriptTmp, artemisCmdScript, artemisCmdScriptBkp, "set ARTEMIS_INSTANCE_ETC=");
+
+         final File serviceXmlTmp = new File(tmp, Create.ARTEMIS_SERVICE_XML);
+         final File serviceXml = new File(bin, Create.ARTEMIS_SERVICE_XML);
+         final File serviceXmlBkp = new File(binBkp, Create.ARTEMIS_SERVICE_XML);
+
+         write(Create.BIN_ARTEMIS_SERVICE_XML, serviceXmlTmp, filters, false, false);
+         upgrade(context, serviceXmlTmp, serviceXml, serviceXmlBkp,
                  "<env name=\"ARTEMIS_INSTANCE\"", "<env name=\"ARTEMIS_INSTANCE_ETC\"",
                  "<env name=\"ARTEMIS_INSTANCE_URI\"", "<env name=\"ARTEMIS_INSTANCE_ETC_URI\"",
                  "<env name=\"ARTEMIS_DATA_DIR\"", "<logpath>", "<startargument>-Xmx", "<stopargument>-Xmx",
                  "<name>", "<id>", "<startargument>-Dhawtio.role=");
 
-         write("etc/" + Create.ETC_ARTEMIS_PROFILE_CMD, new File(tmp, Create.ETC_ARTEMIS_PROFILE_CMD), filters, false, false);
-         upgradeJDK(JDK_PREFIX_WINDOWS, "", KEEPING_JVM_ARGUMENTS, new File(tmp, Create.ETC_ARTEMIS_PROFILE_CMD), new File(etcFolder, Create.ETC_ARTEMIS_PROFILE_CMD), binBkp,
+         final File artemisProfileCmdTmp = new File(tmp, Create.ETC_ARTEMIS_PROFILE_CMD);
+         final File artemisProfileCmd = new File(etcFolder, Create.ETC_ARTEMIS_PROFILE_CMD);
+         final File artemisProfileCmdBkp = new File(etcBkp, Create.ETC_ARTEMIS_PROFILE_CMD);
+
+         write("etc/" + Create.ETC_ARTEMIS_PROFILE_CMD, artemisProfileCmdTmp, filters, false, false);
+         upgradeJDK(context, JDK_PREFIX_WINDOWS, "", KEEPING_JVM_ARGUMENTS, artemisProfileCmdTmp, artemisProfileCmd, artemisProfileCmdBkp,
                     "set ARTEMIS_INSTANCE=\"", "set ARTEMIS_DATA_DIR=", "set ARTEMIS_ETC_DIR=", "set ARTEMIS_OOME_DUMP=", "set ARTEMIS_INSTANCE_URI=", "set ARTEMIS_INSTANCE_ETC_URI=");
       }
 
       if (!IS_WINDOWS || IS_CYGWIN) {
-         write(Create.BIN_ARTEMIS, new File(tmp, Create.ARTEMIS), filters, false, false);
-         upgrade(new File(tmp, Create.ARTEMIS), new File(bin, Create.ARTEMIS), binBkp, "ARTEMIS_INSTANCE_ETC=");
+         final File artemisScriptTmp = new File(tmp, Create.ARTEMIS);
+         final File artemisScriptBkp = new File(binBkp, Create.ARTEMIS);
 
-         write(Create.BIN_ARTEMIS_SERVICE, new File(tmp, Create.ARTEMIS_SERVICE), filters, false, false);
-         upgrade(new File(tmp, Create.ARTEMIS_SERVICE), new File(bin, Create.ARTEMIS_SERVICE), binBkp); // we replace the whole thing
+         write(Create.BIN_ARTEMIS, artemisScriptTmp, filters, false, false);
+         upgrade(context, artemisScriptTmp, artemisScript, artemisScriptBkp, "ARTEMIS_INSTANCE_ETC=");
+
+         final File artemisService = new File(bin, Create.ARTEMIS_SERVICE);
+         final File artemisServiceTmp = new File(tmp, Create.ARTEMIS_SERVICE);
+         final File artemisServiceBkp = new File(binBkp, Create.ARTEMIS_SERVICE);
+
+         write(Create.BIN_ARTEMIS_SERVICE, artemisServiceTmp, filters, false, false);
+         upgrade(context, artemisServiceTmp, artemisService, artemisServiceBkp); // we replace the whole thing
 
          write("etc/" + Create.ETC_ARTEMIS_PROFILE, new File(tmp, Create.ETC_ARTEMIS_PROFILE), filters, false, false);
-         upgradeJDK(JDK_PREFIX_LINUX, "\"", KEEPING_JVM_ARGUMENTS,
-                    new File(tmp, Create.ETC_ARTEMIS_PROFILE), new File(etcFolder, Create.ETC_ARTEMIS_PROFILE), etcBkp, "ARTEMIS_INSTANCE=",
+         upgradeJDK(context, JDK_PREFIX_LINUX, "\"", KEEPING_JVM_ARGUMENTS,
+                    new File(tmp, Create.ETC_ARTEMIS_PROFILE), new File(etcFolder, Create.ETC_ARTEMIS_PROFILE), new File(etcBkp, Create.ETC_ARTEMIS_PROFILE), "ARTEMIS_INSTANCE=",
                     "ARTEMIS_DATA_DIR=", "ARTEMIS_ETC_DIR=", "ARTEMIS_OOME_DUMP=", "ARTEMIS_INSTANCE_URI=", "ARTEMIS_INSTANCE_ETC_URI=", "HAWTIO_ROLE=");
       }
 
+      final File bootstrapXml = new File(etcFolder, Create.ETC_BOOTSTRAP_XML);
+      final File bootstrapXmlTmp = new File(tmp, Create.ETC_BOOTSTRAP_XML);
+      final File bootstrapXmlBkp = new File(etcBkp, Create.ETC_BOOTSTRAP_XML);
 
-      Files.copy( new File(etcFolder, Create.ETC_BOOTSTRAP_XML).toPath(), new File(tmp, Create.ETC_BOOTSTRAP_XML).toPath());
-      replaceLines(new File(tmp, Create.ETC_BOOTSTRAP_XML), new File(etcFolder, Create.ETC_BOOTSTRAP_XML), binBkp, "<web path", "   <web path=\"web\" rootRedirectLocation=\"console\">");
+      Files.copy(bootstrapXml.toPath(), bootstrapXmlTmp.toPath());
+      replaceLines(context, bootstrapXmlTmp, bootstrapXml, bootstrapXmlBkp, "<web path", "   <web path=\"web\" rootRedirectLocation=\"console\">");
 
-      upgradeLogging(context, etcBkp, etcFolder);
+      upgradeLogging(context, etcFolder, etcBkp);
 
       context.out.println();
       context.out.println("*******************************************************************************************************************************");
@@ -179,12 +219,12 @@ public class Upgrade extends InstallAbstract {
    }
 
 
-   private void upgradeJDK(String jdkPrefix, String endOfLine, String[] keepArguments, File tmpFile, File targetFile, File bkp, String... keepingPrefixes) throws Exception {
+   private void upgradeJDK(ActionContext context, String jdkPrefix, String endOfLine, String[] keepArguments, File tmpFile, File targetFile, File bkpFile, String... keepingPrefixes) throws Exception {
 
       final HashMap<String, String> replaceMatrix = new HashMap<>();
       final HashMap<String, String> currentArguments = new HashMap<>();
 
-      doUpgrade(tmpFile, targetFile, bkp,
+      doUpgrade(context, tmpFile, targetFile, bkpFile,
                 oldLine -> {
                    if (oldLine.trim().startsWith(jdkPrefix)) {
                       JVMArgumentParser.parseOriginalArgs(jdkPrefix, endOfLine, oldLine, keepArguments, currentArguments);
@@ -213,8 +253,8 @@ public class Upgrade extends InstallAbstract {
                 });
    }
 
-   private void replaceLines(File tmpFile, File targetFile, File bkp, String... replacePairs) throws Exception {
-      doUpgrade(tmpFile, targetFile, bkp,
+   private void replaceLines(ActionContext context, File tmpFile, File targetFile, File bkpFile, String... replacePairs) throws Exception {
+      doUpgrade(context, tmpFile, targetFile, bkpFile,
                 null,
                 newLine -> {
                    for (int i = 0; i < replacePairs.length; i += 2) {
@@ -226,30 +266,35 @@ public class Upgrade extends InstallAbstract {
                 });
    }
 
-   private void upgrade(File tmpFile, File targetFile, File bkp, String... keepingPrefixes) throws Exception {
+   private void upgrade(ActionContext context, File tmpFile, File targetFile, File bkpFile, String... keepingPrefixes) throws Exception {
       HashMap<String, String> replaceMatrix = new HashMap<>();
 
-      doUpgrade(tmpFile, targetFile, bkp,
+      doUpgrade(context, tmpFile, targetFile, bkpFile,
               oldLine -> {
-                 for (String prefix : keepingPrefixes) {
-                    if (oldLine.trim().startsWith(prefix)) {
-                       replaceMatrix.put(prefix, oldLine);
+                 if (keepingPrefixes.length > 0) {
+                    for (String prefix : keepingPrefixes) {
+                       if (oldLine.trim().startsWith(prefix)) {
+                          replaceMatrix.put(prefix, oldLine);
+                       }
                     }
                  }
               },
             newLine -> {
-               for (String prefix : keepingPrefixes) {
-                  if (newLine.trim().startsWith(prefix)) {
-                     String originalLine = replaceMatrix.get(prefix);
-                     return originalLine;
+               if (keepingPrefixes.length > 0) {
+                  for (String prefix : keepingPrefixes) {
+                     if (newLine.trim().startsWith(prefix)) {
+                        String originalLine = replaceMatrix.get(prefix);
+                        return originalLine;
+                     }
                   }
                }
                return newLine;
             });
    }
 
-   private void doUpgrade(File tmpFile, File targetFile, File bkp, Consumer<String> originalConsumer, Function<String, String> targetFunction) throws Exception {
-      Files.copy(targetFile.toPath(), bkp.toPath(), StandardCopyOption.REPLACE_EXISTING);
+   private void doUpgrade(ActionContext context, File tmpFile, File targetFile, File bkpFile, Consumer<String> originalConsumer, Function<String, String> targetFunction) throws Exception {
+      context.out.println("Copying " + targetFile + " to " + bkpFile);
+      Files.copy(targetFile.toPath(), bkpFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
 
       // we first scan the original lines on the originalConsumer, giving a chance to the caller to fill out the original matrix
       if (originalConsumer != null) {
@@ -260,6 +305,8 @@ public class Upgrade extends InstallAbstract {
          }
       }
 
+      context.out.println("Updating " + targetFile);
+
       // now we open the new file from the tmp, and we will give a chance for the targetFunction to replace lines from a matrix
       try (Stream<String> lines = Files.lines(tmpFile.toPath());
            PrintStream streamOutput = new PrintStream(new FileOutputStream(targetFile))) {
@@ -275,21 +322,21 @@ public class Upgrade extends InstallAbstract {
       }
    }
 
-   private void upgradeLogging(ActionContext context, File bkpFolder, File etc) throws Exception {
-      File oldLogging = new File(etc, OLD_LOGGING_PROPERTIES);
+   private void upgradeLogging(ActionContext context, File etcFolder, File bkpFolder) throws Exception {
+      File oldLogging = new File(etcFolder, OLD_LOGGING_PROPERTIES);
 
       if (oldLogging.exists()) {
          File oldLoggingCopy = new File(bkpFolder, OLD_LOGGING_PROPERTIES);
          context.out.println("Copying " + oldLogging.toPath() + " to " + oldLoggingCopy.toPath());
 
-         Files.copy(oldLogging.toPath(), bkpFolder.toPath(), StandardCopyOption.REPLACE_EXISTING);
+         Files.copy(oldLogging.toPath(), oldLoggingCopy.toPath(), StandardCopyOption.REPLACE_EXISTING);
 
          context.out.println("Removing " + oldLogging.toPath());
          if (!oldLogging.delete()) {
             context.out.println(oldLogging.toPath() + " could not be removed!");
          }
 
-         File newLogging = new File(etc, Create.ETC_LOG4J2_PROPERTIES);
+         File newLogging = new File(etcFolder, Create.ETC_LOG4J2_PROPERTIES);
          if (!newLogging.exists()) {
             context.out.println("Creating " + newLogging);
             try (InputStream inputStream = openStream("etc/" + Create.ETC_LOG4J2_PROPERTIES);
@@ -300,11 +347,11 @@ public class Upgrade extends InstallAbstract {
       }
    }
 
-   protected File findBackup(ActionContext context) {
+   protected File findBackup(ActionContext context) throws IOException {
       for (int bkp = 0; bkp < 10; bkp++) {
          File bkpFolder = new File(directory, "old-config-bkp." + bkp);
          if (!bkpFolder.exists()) {
-            bkpFolder.mkdirs();
+            Files.createDirectory(bkpFolder.toPath());
             context.out.println("Using " + bkpFolder.getAbsolutePath() + " as a backup folder for the modified files");
             return bkpFolder;
          }
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 3e76ae4dcc..77ef291882 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -220,6 +220,22 @@
                      </resources>
                   </configuration>
                </execution>
+               <execution>
+                  <id>copy-reference-for-upgrade-backup-checks</id>
+                  <phase>process-test-resources</phase>
+                  <goals>
+                     <goal>copy-resources</goal>
+                  </goals>
+                  <configuration>
+                     <outputDirectory>${project.build.directory}/reference-for-backup-check</outputDirectory>
+                     <resources>
+                        <resource>
+                           <directory>src/main/filtered-resources</directory>
+                           <filtering>true</filtering>
+                        </resource>
+                     </resources>
+                  </configuration>
+               </execution>
             </executions>
          </plugin>
          <plugin>
diff --git a/tests/smoke-tests/src/main/filtered-resources/servers/windowsUpgrade/bin/artemis-service.exe b/tests/smoke-tests/src/main/filtered-resources/servers/windowsUpgrade/bin/artemis-service.exe
new file mode 100644
index 0000000000..f41bd05916
--- /dev/null
+++ b/tests/smoke-tests/src/main/filtered-resources/servers/windowsUpgrade/bin/artemis-service.exe
@@ -0,0 +1 @@
+This is a fake exe file, just checking it gets copied before being overwritten.
diff --git a/tests/smoke-tests/src/main/filtered-resources/servers/windowsUpgrade/bin/artemis-service.exe.config b/tests/smoke-tests/src/main/filtered-resources/servers/windowsUpgrade/bin/artemis-service.exe.config
new file mode 100644
index 0000000000..816ca742a5
--- /dev/null
+++ b/tests/smoke-tests/src/main/filtered-resources/servers/windowsUpgrade/bin/artemis-service.exe.config
@@ -0,0 +1,6 @@
+<configuration>
+   <startup>
+      <supportedRuntime version="v2.0.50727" />
+      <supportedRuntime version="v4.0" />
+   </startup>
+</configuration>
\ No newline at end of file
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/upgradeTest/CompareUpgradeTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/upgradeTest/CompareUpgradeTest.java
index b287c6a18a..e2cb71a2c3 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/upgradeTest/CompareUpgradeTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/upgradeTest/CompareUpgradeTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.activemq.artemis.tests.smoke.upgradeTest;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
@@ -25,6 +29,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import org.apache.activemq.artemis.cli.commands.Create;
+import org.apache.activemq.artemis.cli.commands.Upgrade;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -50,6 +56,12 @@ public class CompareUpgradeTest {
 
       compareDirectories(windowsExpectedBin, windowsBin);
       compareDirectories(windowsExpectedETC, windowsETC, "broker.xml", "artemis-users.properties");
+
+      String referenceBin = basedir + "/target/reference-for-backup-check/servers/windowsUpgrade/bin";
+      String referenceEtc = basedir + "/target/reference-for-backup-check/servers/windowsUpgradeETC";
+
+      verifyBackupFiles(windows + "/old-config-bkp.0/bin", referenceBin, Create.ARTEMIS_CMD, Create.ARTEMIS_SERVICE_EXE, Create.ARTEMIS_SERVICE_EXE_CONFIG, Create.ARTEMIS_SERVICE_XML);
+      verifyBackupFiles(windows + "/old-config-bkp.0/etc", referenceEtc, Create.ETC_ARTEMIS_PROFILE_CMD, Create.ETC_BOOTSTRAP_XML, Upgrade.OLD_LOGGING_PROPERTIES);
    }
 
    @Test
@@ -64,8 +76,42 @@ public class CompareUpgradeTest {
 
       compareDirectories(linuxExpectedBin, linuxBin);
       compareDirectories(linuxExpectedETC, linuxETC, "broker.xml", "artemis-users.properties");
+
+      String referenceBin = basedir + "/target/reference-for-backup-check/servers/linuxUpgrade/bin";
+      String referenceEtc = basedir + "/target/reference-for-backup-check/servers/linuxUpgradeETC";
+
+      verifyBackupFiles(linux + "/old-config-bkp.0/bin", referenceBin, Create.ARTEMIS, Create.ARTEMIS_SERVICE);
+      verifyBackupFiles(linux + "/old-config-bkp.0/etc", referenceEtc, Create.ETC_ARTEMIS_PROFILE, Create.ETC_BOOTSTRAP_XML, Upgrade.OLD_LOGGING_PROPERTIES);
    }
 
+   private void verifyBackupFiles(String backupFolder, String referenceFolder, String... files) throws Exception {
+      assertTrue("Files to check must be specified", files.length > 0);
+
+      File bck = new File(backupFolder);
+      if (!(bck.exists() && bck.isDirectory())) {
+         Assert.fail("Backup folder does not exist at: " + bck.getAbsolutePath());
+      }
+
+      File[] backupFiles = bck.listFiles();
+      assertNotNull("Some backup files must exist", backupFiles);
+      int backupFilesCount = backupFiles.length;
+      assertTrue("Some backup files must exist", backupFilesCount > 0);
+      assertEquals("Different number of backup files found than specified for inspection, update test if backup procedure changed", files.length, backupFilesCount);
+
+      for (String f : files) {
+         File bf = new File(backupFolder, f);
+         if (!bf.exists()) {
+            Assert.fail("Expected backup file does not exist at: " + bf.getAbsolutePath());
+         }
+
+         File reference = new File(referenceFolder, bf.getName());
+         if (!reference.exists()) {
+            Assert.fail("Reference file does not exist at: " + reference.getAbsolutePath());
+         }
+
+         Assert.assertArrayEquals(bf.getName() + " backup contents do not match reference file", Files.readAllBytes(bf.toPath()), Files.readAllBytes(reference.toPath()));
+      }
+   }
 
    private void compareDirectories(String expectedFolder, String upgradeFolder, String... ignoredFiles) throws Exception {
       File expectedFolderFile = new File(expectedFolder);


[activemq-artemis] 07/13: ARTEMIS-4085 exclusive LVQ sending all messages to consumer

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f1030ddc19c0b317164b4e20aa1b8af66bb3b659
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Nov 7 15:07:35 2022 -0600

    ARTEMIS-4085 exclusive LVQ sending all messages to consumer
    
    (cherry picked from commit ca580814de2c2f4466d00e76d5cf70935a94ff81)
---
 .../artemis/core/server/impl/QueueImpl.java        |   1 +
 .../artemis/tests/integration/server/LVQTest.java  |  24 ++++
 .../tests/integration/stomp/StompLVQTest.java      | 134 +++++++++++++++++++++
 3 files changed, 159 insertions(+)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d222d192fd..488109c83e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3155,6 +3155,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
             if (groupConsumer != null) {
                if (noDelivery > 0) {
+                  pruneLastValues();
                   break;
                }
                noDelivery = 0;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index a37b690d4d..4c8d14a34d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -86,6 +86,30 @@ public class LVQTest extends ActiveMQTestBase {
       Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
    }
 
+   @Test
+   public void testSimpleExclusive() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setConsumerWindowSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      final String EXCLUSIVE_QUEUE = "exclusiveQueue";
+
+      clientSession.createQueue(new QueueConfiguration(EXCLUSIVE_QUEUE).setExclusive(true).setLastValue(true));
+      ClientProducer producer = clientSession.createProducer(EXCLUSIVE_QUEUE);
+      ClientConsumer consumer = clientSession.createConsumer(EXCLUSIVE_QUEUE);
+      clientSession.start();
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      SimpleString rh = new SimpleString("SMID1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      ClientMessage m2 = createTextMessage(clientSession, "m2");
+      m2.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh);
+      producer.send(m1);
+      producer.send(m2);
+      ClientMessage m = consumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
+   }
+
    @Test
    public void testSimpleRestart() throws Exception {
       ClientProducer producer = clientSession.createProducer(address);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
new file mode 100644
index 0000000000..8004199848
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompLVQTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(Parameterized.class)
+public class StompLVQTest extends StompTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   protected StompClientConnection producerConn;
+   protected StompClientConnection consumerConn;
+
+   private final String queue = "lvq";
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server.createQueue(new QueueConfiguration(queue).setLastValue(true).setExclusive(true));
+
+      producerConn = StompClientConnectionFactory.createClientConnection(uri);
+      consumerConn = StompClientConnectionFactory.createClientConnection(uri);
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (producerConn != null && producerConn.isConnected()) {
+            try {
+               producerConn.disconnect();
+            } catch (Exception e) {
+               // ignore
+            }
+         }
+      } finally {
+         producerConn.closeTransport();
+      }
+
+      try {
+         if (consumerConn != null && consumerConn.isConnected()) {
+            try {
+               consumerConn.disconnect();
+            } catch (Exception e) {
+               // ignore
+            }
+         }
+      } finally {
+         consumerConn.closeTransport();
+      }
+
+      super.tearDown();
+   }
+
+   @Test
+   public void testLVQ() throws Exception {
+
+      producerConn.connect(defUser, defPass);
+      consumerConn.connect(defUser, defPass);
+
+      subscribe(consumerConn, "lvqtest", Stomp.Headers.Subscribe.AckModeValues.CLIENT, null, null, queue, true, 0);
+
+      try {
+         for (int i = 1; i <= 100; i++) {
+            String uuid = UUID.randomUUID().toString();
+
+            ClientStompFrame frame = producerConn.sendFrame(producerConn.createFrame(Stomp.Commands.SEND)
+                                                       .addHeader(Stomp.Headers.Send.DESTINATION, queue)
+                                                       .addHeader(Message.HDR_LAST_VALUE_NAME.toString(), "test")
+                                                       .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid)
+                                                       .setBody(String.valueOf(i)));
+
+            assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+            assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+         }
+      } catch (Exception e) {
+         logger.error(null, e);
+      }
+
+      List<ClientStompFrame> messages = new ArrayList<>();
+      try {
+         ClientStompFrame frame;
+
+         while ((frame = consumerConn.receiveFrame(10000)) != null) {
+            assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+
+            ack(consumerConn, null, frame);
+
+            messages.add(frame);
+         }
+      } catch (Exception e) {
+         logger.error(null, e);
+      }
+
+      Assert.assertEquals(2, messages.size());
+      Assert.assertEquals("1", messages.get(0).getBody());
+      Assert.assertEquals("100", messages.get(1).getBody());
+   }
+}
\ No newline at end of file


[activemq-artemis] 02/13: ARTEMIS-4084 Fixing CriticalAnalyzer policy on Test

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 220826b9b8c3a8b9e95315ba3c6f9d893c3b5ba9
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Nov 9 16:49:50 2022 -0500

    ARTEMIS-4084 Fixing CriticalAnalyzer policy on Test
    
    (cherry picked from commit ae4475201e35695ebf6f8a1874744fe535a8be13)
---
 .../artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
index 9e5bbbef65..e4641f9696 100644
--- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
+++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java
@@ -56,7 +56,7 @@ public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
       config.setCriticalAnalyzerTimeout(10000);
       config.setCriticalAnalyzerCheckPeriod(5000);
       config.setConnectionTTLOverride(5000);
-      config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
+      config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.SHUTDOWN);
       server = createServer(false, config);
       server.start();
    }


[activemq-artemis] 05/13: NO-JIRA small tweak on test

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch 2.27.x
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 902e912f0de9013c46c7b37e38da5047abe4ab9a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Nov 10 07:51:45 2022 -0500

    NO-JIRA small tweak on test
    
    (cherry picked from commit 42529899d01df63c4aea5f562a2db8058a455a5e)
---
 .../org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
index f95e91fc2c..b39a9e75dc 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java
@@ -108,8 +108,14 @@ public class LinkedListTest extends ActiveMQTestBase {
       Assert.assertEquals(1, scans);
       list.addSorted(10);
       list.addSorted(20);
+      list.addSorted(19);
       list.addSorted(7); // this will need a scan as it's totally random
       Assert.assertEquals(2, scans);
+      list.addSorted(8);
+      Assert.assertEquals(2, scans);
+      Assert.assertEquals(1, (int)list.poll());
+      list.addSorted(9);
+      Assert.assertEquals(3, scans); // remove (poll) should clear the last added cache
       printDebug();
 
       validateOrder(null);