You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/01 06:07:17 UTC
[activemq-artemis] branch master updated: ARTEMIS-2328 Routing
after empty addresses could lead to invalid messages
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new fa259ba ARTEMIS-2328 Routing after empty addresses could lead to invalid messages
new bede603 This closes #2656
fa259ba is described below
commit fa259ba66e6fa653b90d81f64d5162b8f5d7b341
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed May 1 00:29:31 2019 -0400
ARTEMIS-2328 Routing after empty addresses could lead to invalid messages
---
.../artemis/core/postoffice/impl/BindingsImpl.java | 21 ++-
.../core/postoffice/impl/LocalQueueBinding.java | 13 ++
.../core/postoffice/impl/PostOfficeImpl.java | 2 +-
.../artemis/core/server/RoutingContext.java | 1 -
.../core/server/impl/RoutingContextImpl.java | 32 ++++
.../artemis/tests/util/SingleServerTestBase.java | 6 +-
.../tests/integration/client/MixRoutingTest.java | 197 +++++++++++++++++++++
7 files changed, 260 insertions(+), 12 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index c3bf31a..f5bb2a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -282,7 +282,8 @@ public final class BindingsImpl implements Bindings {
private void route(final Message message,
final RoutingContext context,
final boolean groupRouting) throws Exception {
- boolean reusableContext = context.isReusable(message, version.get());
+ int currentVersion = version.get();
+ boolean reusableContext = context.isReusable(message, currentVersion);
if (!reusableContext) {
context.clear();
@@ -310,13 +311,18 @@ public final class BindingsImpl implements Bindings {
boolean routed = false;
+ boolean hasExclusives = false;
+
for (Binding binding : exclusiveBindings) {
+ if (!hasExclusives) {
+ context.clear().setReusable(false);
+ hasExclusives = true;
+ }
if (binding.getFilter() == null || binding.getFilter().match(message)) {
binding.getBindable().route(message, context);
routed = true;
}
- context.setReusable(false);
}
if (!routed) {
// Remove the ids now, in order to avoid double check
@@ -332,6 +338,7 @@ public final class BindingsImpl implements Bindings {
context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
+ context.clear().setReusable(false);
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
if (theBinding != null) {
theBinding.route(message, context);
@@ -340,21 +347,17 @@ public final class BindingsImpl implements Bindings {
// in a optimization, we are reusing the previous context if everything is right for it
// so the simpleRouting will only happen if needed
if (!reusableContext) {
- simpleRouting(message, context);
+ simpleRouting(message, context, currentVersion);
}
}
}
}
- private void simpleRouting(Message message, RoutingContext context) throws Exception {
+ private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
if (logger.isTraceEnabled()) {
- logger.trace("Routing message " + message + " on binding=" + this);
+ logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
}
- // We check at the version before we started routing,
- // this is because if something changed in between we want to check the correct version
- int currentVersion = version.get();
-
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
SimpleString routingName = entry.getKey();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 79ab4d3..ed5eb3b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
+import org.jboss.logging.Logger;
public class LocalQueueBinding implements QueueBinding {
+ private static final Logger logger = Logger.getLogger(LocalQueueBinding.class);
+
private final SimpleString address;
private final Queue queue;
@@ -119,13 +122,23 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public void route(final Message message, final RoutingContext context) throws Exception {
if (isMatchRoutingType(context)) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("adding routing " + queue.getID() + " on message " + message);
+ }
queue.route(message, context);
+ } else {
+ if (logger.isTraceEnabled()) {
+ logger.trace("routing " + queue.getID() + " is ignored as routing type did not match");
+ }
}
}
@Override
public void routeWithAck(Message message, RoutingContext context) throws Exception {
if (isMatchRoutingType(context)) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Message " + message + " routed with ack on queue " + queue.getID());
+ }
queue.routeWithAck(message, context);
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b0d8063..5764a28 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -934,7 +934,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
}
if (logger.isTraceEnabled()) {
- logger.trace("Message after routed=" + message);
+ logger.trace("Message after routed=" + message + "\n" + context.toString());
}
try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index bfde7af..82091f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -83,5 +83,4 @@ public interface RoutingContext {
boolean isReusable(Message message, int version);
-
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 2c92763..c63f524 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.core.server.impl;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -30,9 +32,12 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
public final class RoutingContextImpl implements RoutingContext {
+ private static final Logger logger = Logger.getLogger(RoutingContextImpl.class);
+
// The pair here is Durable and NonDurable
private final Map<SimpleString, RouteContextList> map = new HashMap<>();
@@ -129,6 +134,27 @@ public final class RoutingContextImpl implements RoutingContext {
}
@Override
+ public String toString() {
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter printWriter = new PrintWriter(stringWriter);
+ printWriter.println("RoutingContextImpl(Address=" + this.address + ", routingType=" + this.routingType + ", PreviousAddress=" + previousAddress + " previousRoute:" + previousRoutingType + ", reusable=" + this.reusable + ", version=" + version + ")");
+ for (Map.Entry<SimpleString, RouteContextList> entry : map.entrySet()) {
+ printWriter.println("..................................................");
+ printWriter.println("***** durable queues " + entry.getKey() + ":");
+ for (Queue queue : entry.getValue().getDurableQueues()) {
+ printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
+ }
+ printWriter.println("***** non durable for " + entry.getKey() + ":");
+ for (Queue queue : entry.getValue().getNonDurableQueues()) {
+ printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
+ }
+ }
+ printWriter.println("..................................................");
+
+ return stringWriter.toString();
+ }
+
+ @Override
public void processReferences(final List<MessageReference> refs, final boolean direct) {
internalprocessReferences(refs, direct);
}
@@ -163,11 +189,17 @@ public final class RoutingContextImpl implements RoutingContext {
@Override
public void setAddress(SimpleString address) {
+ if (this.address == null || !this.address.equals(address)) {
+ this.clear();
+ }
this.address = address;
}
@Override
public void setRoutingType(RoutingType routingType) {
+ if (this.routingType == null || this.routingType != routingType) {
+ this.clear();
+ }
this.routingType = routingType;
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
index df2bff5..373d91c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
@@ -41,7 +41,7 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
- server = createServer(false, createDefaultInVMConfig());
+ server = createServer();
server.start();
locator = createLocator();
@@ -49,6 +49,10 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
session = addClientSession(sf.createSession(false, true, true));
}
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(false, createDefaultInVMConfig());
+ }
+
protected ServerLocator createLocator() {
return createInVMNonHALocator();
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java
new file mode 100644
index 0000000..8e12033
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MixRoutingTest extends SingleServerTestBase {
+ // Constants -----------------------------------------------------
+
+ private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+ private static final long CONNECTION_TTL = 2000;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(false, createDefaultNettyConfig());
+ }
+
+ @Test
+ public void testMix() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString(getName());
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+ Queue queue = session.createQueue(queueName.toString());
+
+ MessageProducer prodTemp = session.createProducer(temporaryQueue);
+ MessageProducer prodQueue = session.createProducer(queue);
+
+ final int NMESSAGES = 100;
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage tmpMessage = session.createTextMessage("tmp");
+ tmpMessage.setIntProperty("i", i);
+ TextMessage permanent = session.createTextMessage("permanent");
+ permanent.setIntProperty("i", i);
+ prodQueue.send(permanent);
+ prodTemp.send(tmpMessage);
+ }
+
+ MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
+ MessageConsumer consumerQueue = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
+ TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
+ Assert.assertNotNull(tmpMessage);
+ Assert.assertNotNull(permanent);
+ Assert.assertEquals("tmp", tmpMessage.getText());
+ Assert.assertEquals("permanent", permanent.getText());
+ Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
+ Assert.assertEquals(i, permanent.getIntProperty("i"));
+ }
+
+ Assert.assertNull(consumerQueue.receiveNoWait());
+ Assert.assertNull(consumerTemp.receiveNoWait());
+ connection.close();
+ factory.close();
+ }
+
+ @Test
+ public void testMix2() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString(getName());
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName.toString());
+
+ MessageProducer prodQueue = session.createProducer(queue);
+
+ final int NMESSAGES = 100;
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage permanent = session.createTextMessage("permanent");
+ permanent.setIntProperty("i", i);
+ prodQueue.send(permanent);
+ }
+
+ TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+ MessageProducer prodTemp = session.createProducer(temporaryQueue);
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage tmpMessage = session.createTextMessage("tmp");
+ tmpMessage.setIntProperty("i", i);
+ prodTemp.send(tmpMessage);
+ }
+
+ MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
+ MessageConsumer consumerQueue = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
+ TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
+ Assert.assertNotNull(tmpMessage);
+ Assert.assertNotNull(permanent);
+ Assert.assertEquals("tmp", tmpMessage.getText());
+ Assert.assertEquals("permanent", permanent.getText());
+ Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
+ Assert.assertEquals(i, permanent.getIntProperty("i"));
+ }
+
+ Assert.assertNull(consumerQueue.receiveNoWait());
+ Assert.assertNull(consumerTemp.receiveNoWait());
+ connection.close();
+ factory.close();
+ }
+
+ @Test
+ public void testMixWithTopics() throws Exception {
+ SimpleString queueName = SimpleString.toSimpleString(getName());
+ SimpleString topicName = SimpleString.toSimpleString("topic" + getName());
+ AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
+ server.addAddressInfo(info);
+ server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = session.createQueue(queueName.toString());
+ Topic topic = session.createTopic(topicName.toString());
+
+ MessageProducer prodQueue = session.createProducer(queue);
+ MessageProducer prodTopic = session.createProducer(topic);
+
+ final int NMESSAGES = 10;
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage topicMessage = session.createTextMessage("topic");
+ topicMessage.setIntProperty("i", i);
+ TextMessage permanent = session.createTextMessage("permanent");
+ permanent.setIntProperty("i", i);
+ prodQueue.send(permanent);
+ prodTopic.send(topicMessage);
+ }
+
+ MessageConsumer consumerQueue = session.createConsumer(queue);
+ connection.start();
+
+ for (int i = 0; i < NMESSAGES; i++) {
+ TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
+ Assert.assertNotNull(permanent);
+ Assert.assertEquals("permanent", permanent.getText());
+ Assert.assertEquals(i, permanent.getIntProperty("i"));
+ }
+
+ Assert.assertNull(consumerQueue.receiveNoWait());
+ connection.close();
+ factory.close();
+ }
+
+}