You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/05 06:17:40 UTC
activemq-artemis git commit: delivery type
Repository: activemq-artemis
Updated Branches:
refs/heads/artemis-1009 72860f652 -> 4ade9b212
delivery type
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ade9b21
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ade9b21
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ade9b21
Branch: refs/heads/artemis-1009
Commit: 4ade9b212199ca198582460484c5a97fb02b35bf
Parents: 72860f6
Author: Clebert Suconic <cl...@apache.org>
Authored: Sun Mar 5 01:17:30 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sun Mar 5 01:17:30 2017 -0500
----------------------------------------------------------------------
.../activemq/artemis/api/core/Message.java | 2 ++
.../artemis/core/message/impl/CoreMessage.java | 9 +++++++
.../protocol/amqp/broker/AMQPMessage.java | 26 +++++++++++++++++++-
.../amqp/converter/AMQPMessageSupport.java | 2 ++
.../core/protocol/openwire/OpenwireMessage.java | 6 +++++
.../core/ServerSessionPacketHandler.java | 2 +-
.../artemis/core/server/ServerSession.java | 1 +
.../core/server/impl/ServerSessionImpl.java | 23 ++++++++++-------
.../impl/ScheduledDeliveryHandlerTest.java | 5 ++++
.../integration/amqp/AmqpSendReceiveTest.java | 21 ----------------
.../integration/client/AcknowledgeTest.java | 6 +++++
11 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index f4f0e84..496a532 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -178,6 +178,8 @@ public interface Message {
// only on core
}
+ RoutingType getRouteType();
+
boolean containsDeliveryAnnotationProperty(SimpleString property);
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index d63ec2c..1e4087f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.encode.BodyType;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -160,6 +161,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
}
@Override
+ public RoutingType getRouteType() {
+ if (containsProperty(Message.HDR_ROUTING_TYPE)) {
+ return RoutingType.getType(getByteProperty(Message.HDR_ROUTING_TYPE));
+ }
+ return null;
+ }
+
+ @Override
public CoreMessage setReplyTo(SimpleString address) {
if (address == null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 9a22379..13bb099 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -30,9 +30,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -172,16 +174,38 @@ public class AMQPMessage extends RefCountMessage {
}
private Object getSymbol(String symbol) {
+ return getSymbol(Symbol.getSymbol(symbol));
+ }
+
+ private Object getSymbol(Symbol symbol) {
MessageAnnotations annotations = getMessageAnnotations();
Map mapAnnotations = annotations != null ? annotations.getValue() : null;
if (mapAnnotations != null) {
- return mapAnnotations.get(Symbol.getSymbol(symbol));
+ return mapAnnotations.get(symbol);
}
return null;
}
@Override
+ public RoutingType getRouteType() {
+
+ switch (((Byte)type).byteValue()) {
+ case AMQPMessageSupport.QUEUE_TYPE:
+ case AMQPMessageSupport.TEMP_QUEUE_TYPE:
+ return RoutingType.ANYCAST;
+
+ case AMQPMessageSupport.TOPIC_TYPE:
+ case AMQPMessageSupport.TEMP_TOPIC_TYPE:
+ return RoutingType.MULTICAST;
+ default:
+ return null;
+ }
+ }
+
+
+
+ @Override
public Long getScheduledDeliveryTime() {
if (scheduledTime < 0) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 0a7a049..351c1a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -50,6 +50,8 @@ import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
*/
public final class AMQPMessageSupport {
+ public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
+
// Message Properties used to map AMQP to JMS and back
/**
* Attribute used to mark the class type of JMS message that a particular message
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 6c86751..186900b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.Persister;
@@ -42,6 +43,11 @@ public class OpenwireMessage implements Message {
}
@Override
+ public RoutingType getRouteType() {
+ return null;
+ }
+
+ @Override
public SimpleString getReplyTo() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index fc0885f..92cae64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -745,7 +745,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
}
- session.doSend(session.getCurrentTransaction(), currentLargeMessage, false, false);
+ session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false, false);
currentLargeMessage = null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 1899d65..0ce0728 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -188,6 +188,7 @@ public interface ServerSession extends SecurityAuth {
RoutingStatus doSend(final Transaction tx,
final Message msg,
+ final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 5361983..1cd19bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1312,6 +1312,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
}
+ SimpleString originalAddress = message.getAddressSimpleString();
+
SimpleString address = removePrefix(message.getAddressSimpleString());
// In case the prefix was removed, we also need to update the message
@@ -1342,7 +1344,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
handleManagementMessage(tx, message, direct);
} else {
- result = doSend(tx, message, direct, noAutoCreateQueue);
+ result = doSend(tx, message, originalAddress, direct, noAutoCreateQueue);
}
return result;
}
@@ -1560,7 +1562,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
reply.setAddress(replyTo);
- doSend(tx, reply, direct, false);
+ doSend(tx, reply, null, direct, false);
}
return RoutingStatus.OK;
@@ -1620,18 +1622,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public RoutingStatus doSend(final Transaction tx,
final Message msg,
+ final SimpleString originalAddress,
final boolean direct,
final boolean noAutoCreateQueue) throws Exception {
RoutingStatus result = RoutingStatus.OK;
- /**
- * TODO Checking message properties on each message is expensive. Instead we should update the API and Core Packets
- * to add the RoutingType information directly.
- */
- RoutingType routingType = null;
- if (msg.containsProperty(Message.HDR_ROUTING_TYPE)) {
- routingType = RoutingType.getType(msg.getByteProperty(Message.HDR_ROUTING_TYPE));
+ RoutingType routingType = msg.getRouteType();
+
+ if (originalAddress != null) {
+ if (originalAddress.toString().startsWith("anycast:")) {
+ routingType = RoutingType.ANYCAST;
+ } else if (originalAddress.toString().startsWith("multicast:")) {
+ routingType = RoutingType.MULTICAST;
+ }
}
+
Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
// Consumer
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index faf8b12..2bd8cb2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -284,6 +284,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
class FakeMessage extends RefCountMessage {
@Override
+ public RoutingType getRouteType() {
+ return null;
+ }
+
+ @Override
public SimpleString getReplyTo() {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 0f006bc..70ff658 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -243,27 +243,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
}
- @Test
- public void testAmbiguousMessageRouting() throws Exception {
- final String addressA = "addressA";
- final String queueA = "queueA";
- final String queueB = "queueB";
- final String queueC = "queueC";
- final String queueD = "queueD";
-
- ActiveMQServerControl serverControl = server.getActiveMQServerControl();
- serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
- serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
- serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
-
- sendMessages(addressA, 1);
-
- assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
- assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
- }
-
@Test(timeout = 60000)
public void testMessageDurableFalse() throws Exception {
sendMessages(getTestName(), 1, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 08d9787..042effd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -341,6 +342,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
final long id;
@Override
+ public RoutingType getRouteType() {
+ return null;
+ }
+
+ @Override
public SimpleString getReplyTo() {
return null;
}