You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/01 06:07:17 UTC

[activemq-artemis] branch master updated: ARTEMIS-2328 Routing after empty addresses could lead to invalid messages

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new fa259ba  ARTEMIS-2328 Routing after empty addresses could lead to invalid messages
     new bede603  This closes #2656
fa259ba is described below

commit fa259ba66e6fa653b90d81f64d5162b8f5d7b341
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed May 1 00:29:31 2019 -0400

    ARTEMIS-2328 Routing after empty addresses could lead to invalid messages
---
 .../artemis/core/postoffice/impl/BindingsImpl.java |  21 ++-
 .../core/postoffice/impl/LocalQueueBinding.java    |  13 ++
 .../core/postoffice/impl/PostOfficeImpl.java       |   2 +-
 .../artemis/core/server/RoutingContext.java        |   1 -
 .../core/server/impl/RoutingContextImpl.java       |  32 ++++
 .../artemis/tests/util/SingleServerTestBase.java   |   6 +-
 .../tests/integration/client/MixRoutingTest.java   | 197 +++++++++++++++++++++
 7 files changed, 260 insertions(+), 12 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index c3bf31a..f5bb2a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -282,7 +282,8 @@ public final class BindingsImpl implements Bindings {
    private void route(final Message message,
                       final RoutingContext context,
                       final boolean groupRouting) throws Exception {
-      boolean reusableContext = context.isReusable(message, version.get());
+      int currentVersion = version.get();
+      boolean reusableContext = context.isReusable(message, currentVersion);
 
       if (!reusableContext) {
          context.clear();
@@ -310,13 +311,18 @@ public final class BindingsImpl implements Bindings {
 
       boolean routed = false;
 
+      boolean hasExclusives = false;
+
       for (Binding binding : exclusiveBindings) {
+         if (!hasExclusives) {
+            context.clear().setReusable(false);
+            hasExclusives = true;
+         }
 
          if (binding.getFilter() == null || binding.getFilter().match(message)) {
             binding.getBindable().route(message, context);
             routed = true;
          }
-         context.setReusable(false);
       }
       if (!routed) {
          // Remove the ids now, in order to avoid double check
@@ -332,6 +338,7 @@ public final class BindingsImpl implements Bindings {
             context.clear().setReusable(false);
             routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
          } else if (CompositeAddress.isFullyQualified(message.getAddress())) {
+            context.clear().setReusable(false);
             Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
             if (theBinding != null) {
                theBinding.route(message, context);
@@ -340,21 +347,17 @@ public final class BindingsImpl implements Bindings {
             // in a optimization, we are reusing the previous context if everything is right for it
             // so the simpleRouting will only happen if needed
             if (!reusableContext) {
-               simpleRouting(message, context);
+               simpleRouting(message, context, currentVersion);
             }
          }
       }
    }
 
-   private void simpleRouting(Message message, RoutingContext context) throws Exception {
+   private void simpleRouting(Message message, RoutingContext context, int currentVersion) throws Exception {
       if (logger.isTraceEnabled()) {
-         logger.trace("Routing message " + message + " on binding=" + this);
+         logger.trace("Routing message " + message + " on binding=" + this + " current context::" + context);
       }
 
-      // We check at the version before we started routing,
-      // this is because if something changed in between we want to check the correct version
-      int currentVersion = version.get();
-
       for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet()) {
          SimpleString routingName = entry.getKey();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 79ab4d3..ed5eb3b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -25,9 +25,12 @@ import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
+import org.jboss.logging.Logger;
 
 public class LocalQueueBinding implements QueueBinding {
 
+   private static final Logger logger = Logger.getLogger(LocalQueueBinding.class);
+
    private final SimpleString address;
 
    private final Queue queue;
@@ -119,13 +122,23 @@ public class LocalQueueBinding implements QueueBinding {
    @Override
    public void route(final Message message, final RoutingContext context) throws Exception {
       if (isMatchRoutingType(context)) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("adding routing " + queue.getID() + " on message " + message);
+         }
          queue.route(message, context);
+      } else {
+         if (logger.isTraceEnabled()) {
+            logger.trace("routing " + queue.getID() + " is ignored as routing type did not match");
+         }
       }
    }
 
    @Override
    public void routeWithAck(Message message, RoutingContext context) throws Exception {
       if (isMatchRoutingType(context)) {
+         if (logger.isTraceEnabled()) {
+            logger.trace("Message " + message + " routed with ack on queue " + queue.getID());
+         }
          queue.routeWithAck(message, context);
       }
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index b0d8063..5764a28 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -934,7 +934,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       }
 
       if (logger.isTraceEnabled()) {
-         logger.trace("Message after routed=" + message);
+         logger.trace("Message after routed=" + message + "\n" + context.toString());
       }
 
       try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
index bfde7af..82091f5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java
@@ -83,5 +83,4 @@ public interface RoutingContext {
    boolean isReusable(Message message, int version);
 
 
-
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
index 2c92763..c63f524 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RoutingContextImpl.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -30,9 +32,12 @@ import org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.jboss.logging.Logger;
 
 public final class RoutingContextImpl implements RoutingContext {
 
+   private static final Logger logger  = Logger.getLogger(RoutingContextImpl.class);
+
    // The pair here is Durable and NonDurable
    private final Map<SimpleString, RouteContextList> map = new HashMap<>();
 
@@ -129,6 +134,27 @@ public final class RoutingContextImpl implements RoutingContext {
    }
 
    @Override
+   public String toString() {
+      StringWriter stringWriter = new StringWriter();
+      PrintWriter printWriter = new PrintWriter(stringWriter);
+      printWriter.println("RoutingContextImpl(Address=" + this.address + ", routingType=" + this.routingType + ", PreviousAddress=" + previousAddress + " previousRoute:" + previousRoutingType + ", reusable=" + this.reusable + ", version=" + version + ")");
+      for (Map.Entry<SimpleString, RouteContextList> entry : map.entrySet()) {
+         printWriter.println("..................................................");
+         printWriter.println("***** durable queues " + entry.getKey() + ":");
+         for (Queue queue : entry.getValue().getDurableQueues()) {
+            printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
+         }
+         printWriter.println("***** non durable for " + entry.getKey() + ":");
+         for (Queue queue : entry.getValue().getNonDurableQueues()) {
+            printWriter.println("- queueID=" + queue.getID() + " address:" + queue.getAddress() + " name:" + queue.getName() + " filter:" + queue.getFilter());
+         }
+      }
+      printWriter.println("..................................................");
+
+      return stringWriter.toString();
+   }
+
+   @Override
    public void processReferences(final List<MessageReference> refs, final boolean direct) {
       internalprocessReferences(refs, direct);
    }
@@ -163,11 +189,17 @@ public final class RoutingContextImpl implements RoutingContext {
 
    @Override
    public void setAddress(SimpleString address) {
+      if (this.address == null || !this.address.equals(address)) {
+         this.clear();
+      }
       this.address = address;
    }
 
    @Override
    public void setRoutingType(RoutingType routingType) {
+      if (this.routingType == null || this.routingType != routingType) {
+         this.clear();
+      }
       this.routingType = routingType;
    }
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
index df2bff5..373d91c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/SingleServerTestBase.java
@@ -41,7 +41,7 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      server = createServer(false, createDefaultInVMConfig());
+      server = createServer();
       server.start();
 
       locator = createLocator();
@@ -49,6 +49,10 @@ public abstract class SingleServerTestBase extends ActiveMQTestBase {
       session = addClientSession(sf.createSession(false, true, true));
    }
 
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(false, createDefaultInVMConfig());
+   }
+
    protected ServerLocator createLocator() {
       return createInVMNonHALocator();
    }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java
new file mode 100644
index 0000000..8e12033
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MixRoutingTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.SingleServerTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MixRoutingTest extends SingleServerTestBase {
+   // Constants -----------------------------------------------------
+
+   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+   private static final long CONNECTION_TTL = 2000;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(false, createDefaultNettyConfig());
+   }
+
+   @Test
+   public void testMix() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString(getName());
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+      Queue queue = session.createQueue(queueName.toString());
+
+      MessageProducer prodTemp = session.createProducer(temporaryQueue);
+      MessageProducer prodQueue = session.createProducer(queue);
+
+      final int NMESSAGES = 100;
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage tmpMessage = session.createTextMessage("tmp");
+         tmpMessage.setIntProperty("i", i);
+         TextMessage permanent = session.createTextMessage("permanent");
+         permanent.setIntProperty("i", i);
+         prodQueue.send(permanent);
+         prodTemp.send(tmpMessage);
+      }
+
+      MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
+      MessageConsumer consumerQueue = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
+         TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
+         Assert.assertNotNull(tmpMessage);
+         Assert.assertNotNull(permanent);
+         Assert.assertEquals("tmp", tmpMessage.getText());
+         Assert.assertEquals("permanent", permanent.getText());
+         Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
+         Assert.assertEquals(i, permanent.getIntProperty("i"));
+      }
+
+      Assert.assertNull(consumerQueue.receiveNoWait());
+      Assert.assertNull(consumerTemp.receiveNoWait());
+      connection.close();
+      factory.close();
+   }
+
+   @Test
+   public void testMix2() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString(getName());
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = session.createQueue(queueName.toString());
+
+      MessageProducer prodQueue = session.createProducer(queue);
+
+      final int NMESSAGES = 100;
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage permanent = session.createTextMessage("permanent");
+         permanent.setIntProperty("i", i);
+         prodQueue.send(permanent);
+      }
+
+      TemporaryQueue temporaryQueue = session.createTemporaryQueue();
+      MessageProducer prodTemp = session.createProducer(temporaryQueue);
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage tmpMessage = session.createTextMessage("tmp");
+         tmpMessage.setIntProperty("i", i);
+         prodTemp.send(tmpMessage);
+      }
+
+      MessageConsumer consumerTemp = session.createConsumer(temporaryQueue);
+      MessageConsumer consumerQueue = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage tmpMessage = (TextMessage) consumerTemp.receive(5000);
+         TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
+         Assert.assertNotNull(tmpMessage);
+         Assert.assertNotNull(permanent);
+         Assert.assertEquals("tmp", tmpMessage.getText());
+         Assert.assertEquals("permanent", permanent.getText());
+         Assert.assertEquals(i, tmpMessage.getIntProperty("i"));
+         Assert.assertEquals(i, permanent.getIntProperty("i"));
+      }
+
+      Assert.assertNull(consumerQueue.receiveNoWait());
+      Assert.assertNull(consumerTemp.receiveNoWait());
+      connection.close();
+      factory.close();
+   }
+
+   @Test
+   public void testMixWithTopics() throws Exception {
+      SimpleString queueName = SimpleString.toSimpleString(getName());
+      SimpleString topicName = SimpleString.toSimpleString("topic" + getName());
+      AddressInfo info = new AddressInfo(topicName, RoutingType.MULTICAST);
+      server.addAddressInfo(info);
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false);
+      ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = session.createQueue(queueName.toString());
+      Topic topic = session.createTopic(topicName.toString());
+
+      MessageProducer prodQueue = session.createProducer(queue);
+      MessageProducer prodTopic = session.createProducer(topic);
+
+      final int NMESSAGES = 10;
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage topicMessage = session.createTextMessage("topic");
+         topicMessage.setIntProperty("i", i);
+         TextMessage permanent = session.createTextMessage("permanent");
+         permanent.setIntProperty("i", i);
+         prodQueue.send(permanent);
+         prodTopic.send(topicMessage);
+      }
+
+      MessageConsumer consumerQueue = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < NMESSAGES; i++) {
+         TextMessage permanent = (TextMessage) consumerQueue.receive(5000);
+         Assert.assertNotNull(permanent);
+         Assert.assertEquals("permanent", permanent.getText());
+         Assert.assertEquals(i, permanent.getIntProperty("i"));
+      }
+
+      Assert.assertNull(consumerQueue.receiveNoWait());
+      connection.close();
+      factory.close();
+   }
+
+}