You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/07 20:36:01 UTC

[1/2] activemq-artemis git commit: ARTEMIS-2076 Make Filter update-able

Repository: activemq-artemis
Updated Branches:
  refs/heads/master cbe46ce89 -> 3cedfbdd8


ARTEMIS-2076 Make Filter update-able

Add Tests
Add implementation inline with other queue updatable settings.
Enhance tests to ensure queue is not destroyed during config change and messages in queue already are preserved


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b88f38b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b88f38b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b88f38b

Branch: refs/heads/master
Commit: 4b88f38b2df6b3c401542884ffe97c8a49069a5c
Parents: cbe46ce
Author: Michael André Pearce <mi...@me.com>
Authored: Fri Sep 7 06:05:33 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 7 15:49:03 2018 -0400

----------------------------------------------------------------------
 .../core/management/ActiveMQServerControl.java  |  1 +
 .../impl/ActiveMQServerControlImpl.java         |  5 +-
 .../artemis/core/postoffice/PostOffice.java     |  2 +
 .../core/postoffice/impl/LocalQueueBinding.java |  8 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |  5 +
 .../artemis/core/server/ActiveMQServer.java     |  1 +
 .../activemq/artemis/core/server/Queue.java     |  2 +
 .../core/server/impl/ActiveMQServerImpl.java    |  8 +-
 .../artemis/core/server/impl/QueueImpl.java     | 10 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  4 +
 .../tests/integration/jms/RedeployTest.java     | 97 ++++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java     | 12 ++-
 .../persistence/ConfigChangeTest.java           | 59 ++++++++++++
 .../resources/reload-queue-filter-updated.xml   | 42 +++++++++
 .../src/test/resources/reload-queue-filter.xml  | 42 +++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  5 +
 .../core/server/impl/fakes/FakePostOffice.java  |  2 +
 17 files changed, 290 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 6ce945c..5719fb6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -705,6 +705,7 @@ public interface ActiveMQServerControl {
    @Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
    String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
                       @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+                      @Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
                       @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
                       @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
                       @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index f708c1d..ec6b0df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -868,12 +868,13 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
                              String user) throws Exception {
-      return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
    }
 
    @Override
    public String updateQueue(String name,
                              String routingType,
+                             String filter,
                              Integer maxConsumers,
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
@@ -885,7 +886,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       clearIO();
 
       try {
-         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
+         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
          if (queue == null) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index ce1fcfd..d31e33b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -66,6 +67,7 @@ public interface PostOffice extends ActiveMQComponent {
 
    QueueBinding updateQueue(SimpleString name,
                             RoutingType routingType,
+                            Filter filter,
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 176d614..79af5d0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -32,8 +32,6 @@ public class LocalQueueBinding implements QueueBinding {
 
    private final Queue queue;
 
-   private final Filter filter;
-
    private final SimpleString clusterName;
 
    private SimpleString name;
@@ -45,8 +43,6 @@ public class LocalQueueBinding implements QueueBinding {
 
       this.name = queue.getName();
 
-      filter = queue.getFilter();
-
       clusterName = queue.getName().concat(nodeID);
    }
 
@@ -57,7 +53,7 @@ public class LocalQueueBinding implements QueueBinding {
 
    @Override
    public Filter getFilter() {
-      return filter;
+      return queue.getFilter();
    }
 
    @Override
@@ -158,7 +154,7 @@ public class LocalQueueBinding implements QueueBinding {
          ", queue=" +
          queue +
          ", filter=" +
-         filter +
+         getFilter() +
          ", name=" +
          name +
          ", clusterName=" +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 598c32b..d4cde18 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -465,6 +465,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
    @Override
    public QueueBinding updateQueue(SimpleString name,
                                    RoutingType routingType,
+                                   Filter filter,
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
@@ -522,6 +523,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             changed = true;
             queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
          }
+         if (filter != null && !filter.equals(queue.getFilter())) {
+            changed = true;
+            queue.setFilter(filter);
+         }
          if (logger.isDebugEnabled()) {
             if (user == null && queue.getUser() != null) {
                logger.debug("Ignoring updating Queue to a NULL user");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 1883362..488c6fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -537,6 +537,7 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue updateQueue(String name,
                      RoutingType routingType,
+                     String filterString,
                      Integer maxConsumers,
                      Boolean purgeOnNoConsumers,
                      Boolean exclusive,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 70051c0..63d39c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -45,6 +45,8 @@ public interface Queue extends Bindable,CriticalComponent {
 
    Filter getFilter();
 
+   void setFilter(Filter filter);
+
    PageSubscription getPageSubscription();
 
    RoutingType getRoutingType();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 498f4e9..dcb6e02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2790,7 +2790,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
 
             if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
-               updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
+               updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
             } else {
                // if the address::queue doesn't exist then create it
                try {
@@ -3252,19 +3252,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
                             String user) throws Exception {
-      return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
    }
 
    @Override
    public Queue updateQueue(String name,
                             RoutingType routingType,
+                            String filterString,
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
                             Integer consumersBeforeDispatch,
                             Long delayBeforeDispatch,
                             String user) throws Exception {
-      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
+      final Filter filter = FilterImpl.createFilter(filterString);
+      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
       if (queueBinding != null) {
          final Queue queue = queueBinding.getQueue();
          return queue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8af8aaa..2891350 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -40,8 +40,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
-
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@@ -119,6 +119,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    private static final Logger logger = Logger.getLogger(QueueImpl.class);
    private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
    private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime");
+   private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter");
 
    public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
 
@@ -695,7 +696,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public Filter getFilter() {
-      return filter;
+      return filterUpdater.get(this);
+   }
+
+   @Override
+   public void setFilter(Filter filter) {
+      filterUpdater.set(this, filter);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 96de8c7..b21781d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -883,6 +883,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void setFilter(Filter filter) {
+      }
+
+      @Override
       public PageSubscription getPageSubscription() {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index ead96d5..600eead 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -38,6 +38,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
@@ -107,6 +108,102 @@ public class RedeployTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testRedeployFilter() throws Exception {
+      Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+      URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
+      URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated.xml");
+      Files.copy(url1.openStream(), brokerXML);
+
+      EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+      embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+      embeddedActiveMQ.start();
+
+      final ReusableLatch latch = new ReusableLatch(1);
+
+      Runnable tick = new Runnable() {
+         @Override
+         public void run() {
+            latch.countDown();
+         }
+      };
+
+      embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+      try {
+         latch.await(10, TimeUnit.SECONDS);
+
+         try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+              Connection connection = factory.createConnection();
+              Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+            connection.start();
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+            Message message = session.createMessage();
+            message.setStringProperty("x", "x");
+            producer.send(message);
+            MessageConsumer consumer = session.createConsumer(queue);
+            assertNotNull(consumer.receive(5000));
+            consumer.close();
+         }
+
+         //Send a message that should remain in the queue (this ensures config change is non-destructive)
+         try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+              Connection connection = factory.createConnection();
+              Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+            connection.start();
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+            Message message = session.createTextMessage("hello");
+            message.setStringProperty("x", "x");
+            producer.send(message);
+         }
+
+         Binding binding = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue"));
+
+         Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
+         brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+         latch.setCount(1);
+         embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+         latch.await(10, TimeUnit.SECONDS);
+
+         Binding bindingAfterChange = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue"));
+
+         assertTrue("Instance should be the same (as should be non destructive)", binding == bindingAfterChange);
+         assertEquals(binding.getID(), bindingAfterChange.getID());
+
+         //Check that after the config change we can still consume a message that was sent before, ensuring config change was non-destructive of the queue.
+         try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+              Connection connection = factory.createConnection();
+              Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+            connection.start();
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message message = consumer.receive(5000);
+            assertNotNull(message);
+            assertEquals("hello", ((TextMessage)message).getText());
+            consumer.close();
+         }
+
+         try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+              Connection connection = factory.createConnection();
+              Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+            connection.start();
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+            Message message = session.createMessage();
+            message.setStringProperty("x", "y");
+            producer.send(message);
+            MessageConsumer consumer = session.createConsumer(queue);
+            assertNotNull(consumer.receive(2000));
+            consumer.close();
+         }
+
+      } finally {
+         embeddedActiveMQ.stop();
+      }
+   }
+
+   @Test
    public void testRedeployWithFailover() throws Exception {
       EmbeddedActiveMQ live = new EmbeddedActiveMQ();
       EmbeddedActiveMQ backup = new EmbeddedActiveMQ();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 2d78092..f1e7051 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -156,8 +156,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
-         public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception {
-            return null;
+         public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
+                                   @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+                                   @Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
+                                   @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
+                                   @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
+                                   @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
+                                   @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
+                                   @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
+                                   @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception {
+            return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
          }
 
          @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
index fefc735..cc6a6f6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
@@ -88,4 +88,63 @@ public class ConfigChangeTest extends ActiveMQTestBase {
 
       server.stop();
    }
+
+   @Test
+   public void testChangeQueueFilterOnRestart() throws Exception {
+      final String filter1 = "x = 'x'";
+      final String filter2 = "x = 'y'";
+
+      Configuration configuration = createDefaultInVMConfig(  );
+      configuration.addAddressesSetting("#", new AddressSettings());
+
+      List addressConfigurations = new ArrayList();
+      CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
+         .setName("myAddress")
+         .addRoutingType(RoutingType.ANYCAST)
+         .addQueueConfiguration(new CoreQueueConfiguration()
+                                   .setName("myQueue")
+                                   .setAddress("myAddress")
+                                   .setFilterString(filter1)
+                                   .setRoutingType(RoutingType.ANYCAST));
+      addressConfigurations.add(addressConfiguration);
+      configuration.setAddressConfigurations(addressConfigurations);
+      server = createServer(true, configuration);
+      server.start();
+
+      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
+      try (JMSContext context = connectionFactory.createContext()) {
+         context.createProducer().setProperty("x", "x").send(context.createQueue("myAddress"), "hello");
+      }
+
+      long originalBindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
+
+      server.stop();
+
+      addressConfiguration = new CoreAddressConfiguration()
+         .setName("myAddress")
+         .addRoutingType(RoutingType.ANYCAST)
+         .addQueueConfiguration(new CoreQueueConfiguration()
+                                   .setName("myQueue")
+                                   .setAddress("myAddress")
+                                   .setFilterString(filter2)
+                                   .setRoutingType(RoutingType.ANYCAST));
+      addressConfigurations.clear();
+      addressConfigurations.add(addressConfiguration);
+      configuration.setAddressConfigurations(addressConfigurations);
+
+      server.start();
+      assertEquals(filter2, server.locateQueue(SimpleString.toSimpleString("myQueue")).getFilter().getFilterString().toString());
+
+      //Ensures the queue is not destroyed by checking message sent before change is consumable after (e.g. no message loss)
+      try (JMSContext context = connectionFactory.createContext()) {
+         Message message = context.createConsumer(context.createQueue("myAddress::myQueue")).receive();
+         assertEquals("hello", ((TextMessage) message).getText());
+      }
+
+      long bindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
+      assertEquals("Ensure the original queue is not destroyed by checking the binding id is the same", originalBindingId, bindingId);
+
+      server.stop();
+
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml
new file mode 100644
index 0000000..37ef67d
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml
@@ -0,0 +1,42 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+      <security-enabled>false</security-enabled>
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+                  <filter string="x = 'y'"/>
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/resources/reload-queue-filter.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-filter.xml b/tests/integration-tests/src/test/resources/reload-queue-filter.xml
new file mode 100644
index 0000000..47dbadc
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-queue-filter.xml
@@ -0,0 +1,42 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core">
+      <security-enabled>false</security-enabled>
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
+      </acceptors>
+
+      <addresses>
+         <address name="myQueue">
+            <anycast>
+               <queue name="myQueue">
+                  <filter string="x = 'x'"/>
+               </queue>
+            </anycast>
+         </address>
+      </addresses>
+   </core>
+</configuration>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 71ced7f..5297ab6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -427,6 +427,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void setFilter(Filter filter) {
+
+   }
+
+   @Override
    public long getMessageCount() {
       return messageCount;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 44e5823..0bbe8ef 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -46,6 +47,7 @@ public class FakePostOffice implements PostOffice {
    @Override
    public QueueBinding updateQueue(SimpleString name,
                                    RoutingType routingType,
+                                   Filter filter,
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,


[2/2] activemq-artemis git commit: This closes #2296

Posted by cl...@apache.org.
This closes #2296


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3cedfbdd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3cedfbdd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3cedfbdd

Branch: refs/heads/master
Commit: 3cedfbdd875fc721d2b4dea5b5999b51a20eb8cb
Parents: cbe46ce 4b88f38
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Sep 7 16:35:35 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 7 16:35:35 2018 -0400

----------------------------------------------------------------------
 .../core/management/ActiveMQServerControl.java  |  1 +
 .../impl/ActiveMQServerControlImpl.java         |  5 +-
 .../artemis/core/postoffice/PostOffice.java     |  2 +
 .../core/postoffice/impl/LocalQueueBinding.java |  8 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |  5 +
 .../artemis/core/server/ActiveMQServer.java     |  1 +
 .../activemq/artemis/core/server/Queue.java     |  2 +
 .../core/server/impl/ActiveMQServerImpl.java    |  8 +-
 .../artemis/core/server/impl/QueueImpl.java     | 10 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  4 +
 .../tests/integration/jms/RedeployTest.java     | 97 ++++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java     | 12 ++-
 .../persistence/ConfigChangeTest.java           | 59 ++++++++++++
 .../resources/reload-queue-filter-updated.xml   | 42 +++++++++
 .../src/test/resources/reload-queue-filter.xml  | 42 +++++++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  5 +
 .../core/server/impl/fakes/FakePostOffice.java  |  2 +
 17 files changed, 290 insertions(+), 15 deletions(-)
----------------------------------------------------------------------