You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/07 20:36:01 UTC
[1/2] activemq-artemis git commit: ARTEMIS-2076 Make Filter
update-able
Repository: activemq-artemis
Updated Branches:
refs/heads/master cbe46ce89 -> 3cedfbdd8
ARTEMIS-2076 Make Filter update-able
Add Tests
Add implementation inline with other queue updatable settings.
Enhance tests to ensure queue is not destroyed during config change and messages in queue already are preserved
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4b88f38b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4b88f38b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4b88f38b
Branch: refs/heads/master
Commit: 4b88f38b2df6b3c401542884ffe97c8a49069a5c
Parents: cbe46ce
Author: Michael André Pearce <mi...@me.com>
Authored: Fri Sep 7 06:05:33 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 7 15:49:03 2018 -0400
----------------------------------------------------------------------
.../core/management/ActiveMQServerControl.java | 1 +
.../impl/ActiveMQServerControlImpl.java | 5 +-
.../artemis/core/postoffice/PostOffice.java | 2 +
.../core/postoffice/impl/LocalQueueBinding.java | 8 +-
.../core/postoffice/impl/PostOfficeImpl.java | 5 +
.../artemis/core/server/ActiveMQServer.java | 1 +
.../activemq/artemis/core/server/Queue.java | 2 +
.../core/server/impl/ActiveMQServerImpl.java | 8 +-
.../artemis/core/server/impl/QueueImpl.java | 10 +-
.../impl/ScheduledDeliveryHandlerTest.java | 4 +
.../tests/integration/jms/RedeployTest.java | 97 ++++++++++++++++++++
.../ActiveMQServerControlUsingCoreTest.java | 12 ++-
.../persistence/ConfigChangeTest.java | 59 ++++++++++++
.../resources/reload-queue-filter-updated.xml | 42 +++++++++
.../src/test/resources/reload-queue-filter.xml | 42 +++++++++
.../unit/core/postoffice/impl/FakeQueue.java | 5 +
.../core/server/impl/fakes/FakePostOffice.java | 2 +
17 files changed, 290 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 6ce945c..5719fb6 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -705,6 +705,7 @@ public interface ActiveMQServerControl {
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+ @Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index f708c1d..ec6b0df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -868,12 +868,13 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
Boolean purgeOnNoConsumers,
Boolean exclusive,
String user) throws Exception {
- return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+ return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
}
@Override
public String updateQueue(String name,
String routingType,
+ String filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
@@ -885,7 +886,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
+ final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
if (queue == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index ce1fcfd..d31e33b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
@@ -66,6 +67,7 @@ public interface PostOffice extends ActiveMQComponent {
QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
+ Filter filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 176d614..79af5d0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -32,8 +32,6 @@ public class LocalQueueBinding implements QueueBinding {
private final Queue queue;
- private final Filter filter;
-
private final SimpleString clusterName;
private SimpleString name;
@@ -45,8 +43,6 @@ public class LocalQueueBinding implements QueueBinding {
this.name = queue.getName();
- filter = queue.getFilter();
-
clusterName = queue.getName().concat(nodeID);
}
@@ -57,7 +53,7 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public Filter getFilter() {
- return filter;
+ return queue.getFilter();
}
@Override
@@ -158,7 +154,7 @@ public class LocalQueueBinding implements QueueBinding {
", queue=" +
queue +
", filter=" +
- filter +
+ getFilter() +
", name=" +
name +
", clusterName=" +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 598c32b..d4cde18 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -465,6 +465,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
+ Filter filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
@@ -522,6 +523,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
}
+ if (filter != null && !filter.equals(queue.getFilter())) {
+ changed = true;
+ queue.setFilter(filter);
+ }
if (logger.isDebugEnabled()) {
if (user == null && queue.getUser() != null) {
logger.debug("Ignoring updating Queue to a NULL user");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 1883362..488c6fd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -537,6 +537,7 @@ public interface ActiveMQServer extends ServiceComponent {
Queue updateQueue(String name,
RoutingType routingType,
+ String filterString,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 70051c0..63d39c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -45,6 +45,8 @@ public interface Queue extends Bindable,CriticalComponent {
Filter getFilter();
+ void setFilter(Filter filter);
+
PageSubscription getPageSubscription();
RoutingType getRoutingType();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 498f4e9..dcb6e02 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -2790,7 +2790,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
- updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
+ updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
} else {
// if the address::queue doesn't exist then create it
try {
@@ -3252,19 +3252,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
Boolean purgeOnNoConsumers,
Boolean exclusive,
String user) throws Exception {
- return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+ return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
}
@Override
public Queue updateQueue(String name,
RoutingType routingType,
+ String filterString,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
String user) throws Exception {
- final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
+ final Filter filter = FilterImpl.createFilter(filterString);
+ final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
if (queueBinding != null) {
final Queue queue = queueBinding.getQueue();
return queue;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8af8aaa..2891350 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -40,8 +40,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@@ -119,6 +119,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private static final Logger logger = Logger.getLogger(QueueImpl.class);
private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime");
+ private static final AtomicReferenceFieldUpdater<QueueImpl, Filter> filterUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueImpl.class, Filter.class, "filter");
public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
@@ -695,7 +696,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public Filter getFilter() {
- return filter;
+ return filterUpdater.get(this);
+ }
+
+ @Override
+ public void setFilter(Filter filter) {
+ filterUpdater.set(this, filter);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 96de8c7..b21781d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -883,6 +883,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void setFilter(Filter filter) {
+ }
+
+ @Override
public PageSubscription getPageSubscription() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index ead96d5..600eead 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -38,6 +38,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
@@ -107,6 +108,102 @@ public class RedeployTest extends ActiveMQTestBase {
}
@Test
+ public void testRedeployFilter() throws Exception {
+ Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
+ URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
+ URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated.xml");
+ Files.copy(url1.openStream(), brokerXML);
+
+ EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
+ embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
+ embeddedActiveMQ.start();
+
+ final ReusableLatch latch = new ReusableLatch(1);
+
+ Runnable tick = new Runnable() {
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ };
+
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+
+ try {
+ latch.await(10, TimeUnit.SECONDS);
+
+ try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+ connection.start();
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setStringProperty("x", "x");
+ producer.send(message);
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull(consumer.receive(5000));
+ consumer.close();
+ }
+
+ //Send a message that should remain in the queue (this ensures config change is non-destructive)
+ try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+ connection.start();
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createTextMessage("hello");
+ message.setStringProperty("x", "x");
+ producer.send(message);
+ }
+
+ Binding binding = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue"));
+
+ Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
+ brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
+ latch.setCount(1);
+ embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+ latch.await(10, TimeUnit.SECONDS);
+
+ Binding bindingAfterChange = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myQueue"));
+
+ assertTrue("Instance should be the same (as should be non destructive)", binding == bindingAfterChange);
+ assertEquals(binding.getID(), bindingAfterChange.getID());
+
+ //Check that after the config change we can still consume a message that was sent before, ensuring config change was non-destructive of the queue.
+ try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+ connection.start();
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(5000);
+ assertNotNull(message);
+ assertEquals("hello", ((TextMessage)message).getText());
+ consumer.close();
+ }
+
+ try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
+ connection.start();
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+ Message message = session.createMessage();
+ message.setStringProperty("x", "y");
+ producer.send(message);
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull(consumer.receive(2000));
+ consumer.close();
+ }
+
+ } finally {
+ embeddedActiveMQ.stop();
+ }
+ }
+
+ @Test
public void testRedeployWithFailover() throws Exception {
EmbeddedActiveMQ live = new EmbeddedActiveMQ();
EmbeddedActiveMQ backup = new EmbeddedActiveMQ();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 2d78092..f1e7051 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -156,8 +156,16 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
}
@Override
- public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception {
- return null;
+ public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
+ @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+ @Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
+ @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
+ @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
+ @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
+ @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
+ @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
+ @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception {
+ return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
index fefc735..cc6a6f6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
@@ -88,4 +88,63 @@ public class ConfigChangeTest extends ActiveMQTestBase {
server.stop();
}
+
+ @Test
+ public void testChangeQueueFilterOnRestart() throws Exception {
+ final String filter1 = "x = 'x'";
+ final String filter2 = "x = 'y'";
+
+ Configuration configuration = createDefaultInVMConfig( );
+ configuration.addAddressesSetting("#", new AddressSettings());
+
+ List addressConfigurations = new ArrayList();
+ CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration()
+ .setName("myAddress")
+ .addRoutingType(RoutingType.ANYCAST)
+ .addQueueConfiguration(new CoreQueueConfiguration()
+ .setName("myQueue")
+ .setAddress("myAddress")
+ .setFilterString(filter1)
+ .setRoutingType(RoutingType.ANYCAST));
+ addressConfigurations.add(addressConfiguration);
+ configuration.setAddressConfigurations(addressConfigurations);
+ server = createServer(true, configuration);
+ server.start();
+
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://0");
+ try (JMSContext context = connectionFactory.createContext()) {
+ context.createProducer().setProperty("x", "x").send(context.createQueue("myAddress"), "hello");
+ }
+
+ long originalBindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
+
+ server.stop();
+
+ addressConfiguration = new CoreAddressConfiguration()
+ .setName("myAddress")
+ .addRoutingType(RoutingType.ANYCAST)
+ .addQueueConfiguration(new CoreQueueConfiguration()
+ .setName("myQueue")
+ .setAddress("myAddress")
+ .setFilterString(filter2)
+ .setRoutingType(RoutingType.ANYCAST));
+ addressConfigurations.clear();
+ addressConfigurations.add(addressConfiguration);
+ configuration.setAddressConfigurations(addressConfigurations);
+
+ server.start();
+ assertEquals(filter2, server.locateQueue(SimpleString.toSimpleString("myQueue")).getFilter().getFilterString().toString());
+
+ //Ensures the queue is not destroyed by checking message sent before change is consumable after (e.g. no message loss)
+ try (JMSContext context = connectionFactory.createContext()) {
+ Message message = context.createConsumer(context.createQueue("myAddress::myQueue")).receive();
+ assertEquals("hello", ((TextMessage) message).getText());
+ }
+
+ long bindingId = server.getPostOffice().getBinding(SimpleString.toSimpleString("myQueue")).getID();
+ assertEquals("Ensure the original queue is not destroyed by checking the binding id is the same", originalBindingId, bindingId);
+
+ server.stop();
+
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml
new file mode 100644
index 0000000..37ef67d
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-queue-filter-updated.xml
@@ -0,0 +1,42 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
+ </acceptors>
+
+ <addresses>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ <filter string="x = 'y'"/>
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/integration-tests/src/test/resources/reload-queue-filter.xml
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/resources/reload-queue-filter.xml b/tests/integration-tests/src/test/resources/reload-queue-filter.xml
new file mode 100644
index 0000000..47dbadc
--- /dev/null
+++ b/tests/integration-tests/src/test/resources/reload-queue-filter.xml
@@ -0,0 +1,42 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+ <core xmlns="urn:activemq:core">
+ <security-enabled>false</security-enabled>
+
+ <acceptors>
+ <acceptor name="artemis">tcp://0.0.0.0:61616</acceptor>
+ </acceptors>
+
+ <addresses>
+ <address name="myQueue">
+ <anycast>
+ <queue name="myQueue">
+ <filter string="x = 'x'"/>
+ </queue>
+ </anycast>
+ </address>
+ </addresses>
+ </core>
+</configuration>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 71ced7f..5297ab6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -427,6 +427,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void setFilter(Filter filter) {
+
+ }
+
+ @Override
public long getMessageCount() {
return messageCount;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4b88f38b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 44e5823..0bbe8ef 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -46,6 +47,7 @@ public class FakePostOffice implements PostOffice {
@Override
public QueueBinding updateQueue(SimpleString name,
RoutingType routingType,
+ Filter filter,
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
[2/2] activemq-artemis git commit: This closes #2296
Posted by cl...@apache.org.
This closes #2296
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3cedfbdd
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3cedfbdd
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3cedfbdd
Branch: refs/heads/master
Commit: 3cedfbdd875fc721d2b4dea5b5999b51a20eb8cb
Parents: cbe46ce 4b88f38
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Sep 7 16:35:35 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Sep 7 16:35:35 2018 -0400
----------------------------------------------------------------------
.../core/management/ActiveMQServerControl.java | 1 +
.../impl/ActiveMQServerControlImpl.java | 5 +-
.../artemis/core/postoffice/PostOffice.java | 2 +
.../core/postoffice/impl/LocalQueueBinding.java | 8 +-
.../core/postoffice/impl/PostOfficeImpl.java | 5 +
.../artemis/core/server/ActiveMQServer.java | 1 +
.../activemq/artemis/core/server/Queue.java | 2 +
.../core/server/impl/ActiveMQServerImpl.java | 8 +-
.../artemis/core/server/impl/QueueImpl.java | 10 +-
.../impl/ScheduledDeliveryHandlerTest.java | 4 +
.../tests/integration/jms/RedeployTest.java | 97 ++++++++++++++++++++
.../ActiveMQServerControlUsingCoreTest.java | 12 ++-
.../persistence/ConfigChangeTest.java | 59 ++++++++++++
.../resources/reload-queue-filter-updated.xml | 42 +++++++++
.../src/test/resources/reload-queue-filter.xml | 42 +++++++++
.../unit/core/postoffice/impl/FakeQueue.java | 5 +
.../core/server/impl/fakes/FakePostOffice.java | 2 +
17 files changed, 290 insertions(+), 15 deletions(-)
----------------------------------------------------------------------