You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/30 13:27:31 UTC

[11/27] activemq-artemis git commit: Added RoutingType to message

Added RoutingType to message


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fea23a1c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fea23a1c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fea23a1c

Branch: refs/heads/ARTEMIS-780
Commit: fea23a1c7c7bb8e96f1a9d13b156e81ffecb0b49
Parents: fa926e4
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 29 15:24:31 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 29 15:36:41 2016 +0000

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  5 +++
 .../wireformat/CreateSharedQueueMessage_V2.java |  1 -
 .../jms/client/ActiveMQMessageProducer.java     |  4 ++
 .../core/postoffice/impl/LocalQueueBinding.java | 18 ++++++--
 .../core/postoffice/impl/PostOfficeImpl.java    |  5 +++
 .../artemis/jms/tests/MessageProducerTest.java  | 46 ++++++++++++++++++++
 6 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 1ea9309..80116ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -112,6 +112,11 @@ public interface Message {
     */
    SimpleString HDR_VALIDATED_USER = new SimpleString("_AMQ_VALIDATED_USER");
 
+   /**
+    * The Routing Type for this message.  Ensures that this message is only routed to queues with matching routing type.
+    */
+   SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
+
    byte DEFAULT_TYPE = 0;
 
    byte OBJECT_TYPE = 2;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 40b9cb5..c8bf86e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -64,7 +64,6 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       return buff.toString();
    }
 
-
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       buffer.writeSimpleString(address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 47d9ff2..aa4754b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.utils.UUID;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -491,6 +492,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
       coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
 
+      byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
+      coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType);
+
       try {
          /**
           * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 30e3768..d02f0f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -18,12 +18,13 @@ package org.apache.activemq.artemis.core.postoffice.impl;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.Bindable;
-import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 
 public class LocalQueueBinding implements QueueBinding {
@@ -117,12 +118,23 @@ public class LocalQueueBinding implements QueueBinding {
 
    @Override
    public void route(final ServerMessage message, final RoutingContext context) throws Exception {
-      queue.route(message, context);
+      if (isMatchRoutingType(message)) {
+         queue.route(message, context);
+      }
    }
 
    @Override
    public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
-      queue.routeWithAck(message, context);
+      if (isMatchRoutingType(message)) {
+         queue.routeWithAck(message, context);
+      }
+   }
+
+   private boolean isMatchRoutingType(ServerMessage message) {
+      if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) {
+         return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType();
+      }
+      return true;
    }
 
    public boolean isQueueBinding() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index dc73680..2fc3409 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -655,6 +655,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                               final RoutingContext context,
                               final boolean direct,
                               boolean rejectDuplicates) throws Exception {
+
       RoutingStatus result = RoutingStatus.OK;
       // Sanity check
       if (message.getRefCount() > 0) {
@@ -663,6 +664,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       SimpleString address = message.getAddress();
 
+      if (address.toString().equals("testQueue")) {
+         System.out.println("f");
+      }
+
       setPagingStore(message);
 
       AtomicBoolean startedTX = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
index d001f5b..c5fb964 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
@@ -25,19 +25,29 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage;
 import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage;
 import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
 import org.junit.Test;
 
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
 public class MessageProducerTest extends JMSTestCase {
 
    @Test
@@ -695,6 +705,42 @@ public class MessageProducerTest extends JMSTestCase {
 
       ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException);
    }
+
+   @Test
+   public void testSendToQueueOnlyWhenTopicWithSameAddress() throws Exception {
+      SimpleString addr = SimpleString.toSimpleString("testAddr");
+
+      Set<RoutingType> supportedRoutingTypes = new HashSet<>();
+      supportedRoutingTypes.add(RoutingType.ANYCAST);
+      supportedRoutingTypes.add(RoutingType.MULTICAST);
+
+      servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
+      servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false);
+
+      Connection pconn = createConnection();
+      pconn.start();
+
+      Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      Queue queue = ps.createQueue(addr.toString());
+      Topic topic = ps.createTopic(addr.toString());
+
+      MessageConsumer queueConsumer = ps.createConsumer(queue);
+      MessageConsumer topicConsumer = ps.createConsumer(topic);
+
+      MessageProducer queueProducer = ps.createProducer(queue);
+      queueProducer.send(ps.createMessage());
+
+      assertNotNull(queueConsumer.receive(1000));
+      assertNull(topicConsumer.receive(1000));
+
+      MessageProducer topicProducer = ps.createProducer(topic);
+      topicProducer.send(ps.createMessage());
+
+      assertNull(queueConsumer.receive(1000));
+      assertNotNull(topicConsumer.receive(1000));
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------