You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/30 13:27:31 UTC
[11/27] activemq-artemis git commit: Added RoutingType to message
Added RoutingType to message
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/fea23a1c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/fea23a1c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/fea23a1c
Branch: refs/heads/ARTEMIS-780
Commit: fea23a1c7c7bb8e96f1a9d13b156e81ffecb0b49
Parents: fa926e4
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 29 15:24:31 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 29 15:36:41 2016 +0000
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 5 +++
.../wireformat/CreateSharedQueueMessage_V2.java | 1 -
.../jms/client/ActiveMQMessageProducer.java | 4 ++
.../core/postoffice/impl/LocalQueueBinding.java | 18 ++++++--
.../core/postoffice/impl/PostOfficeImpl.java | 5 +++
.../artemis/jms/tests/MessageProducerTest.java | 46 ++++++++++++++++++++
6 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 1ea9309..80116ed 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -112,6 +112,11 @@ public interface Message {
*/
SimpleString HDR_VALIDATED_USER = new SimpleString("_AMQ_VALIDATED_USER");
+ /**
+ * The Routing Type for this message. Ensures that this message is only routed to queues with matching routing type.
+ */
+ SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
+
byte DEFAULT_TYPE = 0;
byte OBJECT_TYPE = 2;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 40b9cb5..c8bf86e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -64,7 +64,6 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
return buff.toString();
}
-
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
buffer.writeSimpleString(address);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 47d9ff2..aa4754b 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
+import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
@@ -491,6 +492,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
+ byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
+ coreMessage.putByteProperty(MessageImpl.HDR_ROUTING_TYPE, routingType);
+
try {
/**
* Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
index 30e3768..d02f0f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/LocalQueueBinding.java
@@ -18,12 +18,13 @@ package org.apache.activemq.artemis.core.postoffice.impl;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.postoffice.BindingType;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.server.Bindable;
-import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.ServerMessage;
public class LocalQueueBinding implements QueueBinding {
@@ -117,12 +118,23 @@ public class LocalQueueBinding implements QueueBinding {
@Override
public void route(final ServerMessage message, final RoutingContext context) throws Exception {
- queue.route(message, context);
+ if (isMatchRoutingType(message)) {
+ queue.route(message, context);
+ }
}
@Override
public void routeWithAck(ServerMessage message, RoutingContext context) throws Exception {
- queue.routeWithAck(message, context);
+ if (isMatchRoutingType(message)) {
+ queue.routeWithAck(message, context);
+ }
+ }
+
+ private boolean isMatchRoutingType(ServerMessage message) {
+ if (message.containsProperty(MessageInternal.HDR_ROUTING_TYPE)) {
+ return message.getByteProperty(MessageInternal.HDR_ROUTING_TYPE) == queue.getRoutingType().getType();
+ }
+ return true;
}
public boolean isQueueBinding() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index dc73680..2fc3409 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -655,6 +655,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
final RoutingContext context,
final boolean direct,
boolean rejectDuplicates) throws Exception {
+
RoutingStatus result = RoutingStatus.OK;
// Sanity check
if (message.getRefCount() > 0) {
@@ -663,6 +664,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
SimpleString address = message.getAddress();
+ if (address.toString().equals("testQueue")) {
+ System.out.println("f");
+ }
+
setPagingStore(message);
AtomicBoolean startedTX = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/fea23a1c/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
----------------------------------------------------------------------
diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
index d001f5b..c5fb964 100644
--- a/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
+++ b/tests/jms-tests/src/test/java/org/apache/activemq/artemis/jms/tests/MessageProducerTest.java
@@ -25,19 +25,29 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSMessage;
import org.apache.activemq.artemis.jms.tests.message.SimpleJMSTextMessage;
import org.apache.activemq.artemis.jms.tests.util.ProxyAssertSupport;
import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
public class MessageProducerTest extends JMSTestCase {
@Test
@@ -695,6 +705,42 @@ public class MessageProducerTest extends JMSTestCase {
ProxyAssertSupport.assertTrue(listener.exception instanceof javax.jms.IllegalStateException);
}
+
+ @Test
+ public void testSendToQueueOnlyWhenTopicWithSameAddress() throws Exception {
+ SimpleString addr = SimpleString.toSimpleString("testAddr");
+
+ Set<RoutingType> supportedRoutingTypes = new HashSet<>();
+ supportedRoutingTypes.add(RoutingType.ANYCAST);
+ supportedRoutingTypes.add(RoutingType.MULTICAST);
+
+ servers.get(0).getActiveMQServer().createAddressInfo(new AddressInfo(addr, supportedRoutingTypes));
+ servers.get(0).getActiveMQServer().createQueue(addr, RoutingType.ANYCAST, addr, null, false, false);
+
+ Connection pconn = createConnection();
+ pconn.start();
+
+ Session ps = pconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue queue = ps.createQueue(addr.toString());
+ Topic topic = ps.createTopic(addr.toString());
+
+ MessageConsumer queueConsumer = ps.createConsumer(queue);
+ MessageConsumer topicConsumer = ps.createConsumer(topic);
+
+ MessageProducer queueProducer = ps.createProducer(queue);
+ queueProducer.send(ps.createMessage());
+
+ assertNotNull(queueConsumer.receive(1000));
+ assertNull(topicConsumer.receive(1000));
+
+ MessageProducer topicProducer = ps.createProducer(topic);
+ topicProducer.send(ps.createMessage());
+
+ assertNull(queueConsumer.receive(1000));
+ assertNotNull(topicConsumer.receive(1000));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------