You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/05 06:17:40 UTC

activemq-artemis git commit: delivery type

Repository: activemq-artemis
Updated Branches:
  refs/heads/artemis-1009 72860f652 -> 4ade9b212


delivery type


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ade9b21
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ade9b21
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ade9b21

Branch: refs/heads/artemis-1009
Commit: 4ade9b212199ca198582460484c5a97fb02b35bf
Parents: 72860f6
Author: Clebert Suconic <cl...@apache.org>
Authored: Sun Mar 5 01:17:30 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sun Mar 5 01:17:30 2017 -0500

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  2 ++
 .../artemis/core/message/impl/CoreMessage.java  |  9 +++++++
 .../protocol/amqp/broker/AMQPMessage.java       | 26 +++++++++++++++++++-
 .../amqp/converter/AMQPMessageSupport.java      |  2 ++
 .../core/protocol/openwire/OpenwireMessage.java |  6 +++++
 .../core/ServerSessionPacketHandler.java        |  2 +-
 .../artemis/core/server/ServerSession.java      |  1 +
 .../core/server/impl/ServerSessionImpl.java     | 23 ++++++++++-------
 .../impl/ScheduledDeliveryHandlerTest.java      |  5 ++++
 .../integration/amqp/AmqpSendReceiveTest.java   | 21 ----------------
 .../integration/client/AcknowledgeTest.java     |  6 +++++
 11 files changed, 71 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index f4f0e84..496a532 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -178,6 +178,8 @@ public interface Message {
       // only on core
    }
 
+   RoutingType getRouteType();
+
    boolean containsDeliveryAnnotationProperty(SimpleString property);
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index d63ec2c..1e4087f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.encode.BodyType;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
@@ -160,6 +161,14 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
    }
 
    @Override
+   public RoutingType getRouteType() {
+      if (containsProperty(Message.HDR_ROUTING_TYPE)) {
+         return RoutingType.getType(getByteProperty(Message.HDR_ROUTING_TYPE));
+      }
+      return null;
+   }
+
+   @Override
    public CoreMessage setReplyTo(SimpleString address) {
 
       if (address == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 9a22379..13bb099 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -30,9 +30,11 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -172,16 +174,38 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    private Object getSymbol(String symbol) {
+      return getSymbol(Symbol.getSymbol(symbol));
+   }
+
+   private Object getSymbol(Symbol symbol) {
       MessageAnnotations annotations = getMessageAnnotations();
       Map mapAnnotations = annotations != null ? annotations.getValue() : null;
       if (mapAnnotations != null) {
-         return mapAnnotations.get(Symbol.getSymbol(symbol));
+         return mapAnnotations.get(symbol);
       }
 
       return null;
    }
 
    @Override
+   public RoutingType getRouteType() {
+
+      switch (((Byte)type).byteValue()) {
+         case AMQPMessageSupport.QUEUE_TYPE:
+         case AMQPMessageSupport.TEMP_QUEUE_TYPE:
+            return RoutingType.ANYCAST;
+
+         case AMQPMessageSupport.TOPIC_TYPE:
+         case AMQPMessageSupport.TEMP_TOPIC_TYPE:
+            return RoutingType.MULTICAST;
+         default:
+            return null;
+      }
+   }
+
+
+
+   @Override
    public Long getScheduledDeliveryTime() {
 
       if (scheduledTime < 0) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 0a7a049..351c1a6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -50,6 +50,8 @@ import static org.apache.activemq.artemis.api.core.Message.TEXT_TYPE;
  */
 public final class AMQPMessageSupport {
 
+   public static final String JMS_REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-jms-reply-to";
+
    // Message Properties used to map AMQP to JMS and back
    /**
     * Attribute used to mark the class type of JMS message that a particular message

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 6c86751..186900b 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessageListener;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.Persister;
 
@@ -42,6 +43,11 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
+   public RoutingType getRouteType() {
+      return null;
+   }
+
+   @Override
    public SimpleString getReplyTo() {
       return null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index fc0885f..92cae64 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -745,7 +745,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
          }
 
 
-         session.doSend(session.getCurrentTransaction(), currentLargeMessage, false, false);
+         session.doSend(session.getCurrentTransaction(), currentLargeMessage, null, false, false);
 
          currentLargeMessage = null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 1899d65..0ce0728 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -188,6 +188,7 @@ public interface ServerSession extends SecurityAuth {
 
    RoutingStatus doSend(final Transaction tx,
                         final Message msg,
+                        final SimpleString originalAddress,
                         final boolean direct,
                         final boolean noAutoCreateQueue) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 5361983..1cd19bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1312,6 +1312,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          message.putStringProperty(Message.HDR_VALIDATED_USER, SimpleString.toSimpleString(validatedUser));
       }
 
+      SimpleString originalAddress = message.getAddressSimpleString();
+
       SimpleString address = removePrefix(message.getAddressSimpleString());
 
       // In case the prefix was removed, we also need to update the message
@@ -1342,7 +1344,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
          handleManagementMessage(tx, message, direct);
       } else {
-         result = doSend(tx, message, direct, noAutoCreateQueue);
+         result = doSend(tx, message, originalAddress, direct, noAutoCreateQueue);
       }
       return result;
    }
@@ -1560,7 +1562,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          }
          reply.setAddress(replyTo);
 
-         doSend(tx, reply, direct, false);
+         doSend(tx, reply, null, direct, false);
       }
 
       return RoutingStatus.OK;
@@ -1620,18 +1622,21 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    @Override
    public RoutingStatus doSend(final Transaction tx,
                                final Message msg,
+                               final SimpleString originalAddress,
                                final boolean direct,
                                final boolean noAutoCreateQueue) throws Exception {
       RoutingStatus result = RoutingStatus.OK;
 
-      /**
-       *  TODO Checking message properties on each message is expensive.  Instead we should update the API and Core Packets
-       *  to add the RoutingType information directly.
-       */
-      RoutingType routingType = null;
-      if (msg.containsProperty(Message.HDR_ROUTING_TYPE)) {
-         routingType = RoutingType.getType(msg.getByteProperty(Message.HDR_ROUTING_TYPE));
+      RoutingType routingType = msg.getRouteType();
+
+      if (originalAddress != null) {
+         if (originalAddress.toString().startsWith("anycast:")) {
+            routingType = RoutingType.ANYCAST;
+         } else if (originalAddress.toString().startsWith("multicast:")) {
+            routingType = RoutingType.MULTICAST;
+         }
       }
+
       Pair<SimpleString, RoutingType> art = getAddressAndRoutingType(msg.getAddressSimpleString(), routingType);
 
       // Consumer

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index faf8b12..2bd8cb2 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -284,6 +284,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
    class FakeMessage extends RefCountMessage {
 
       @Override
+      public RoutingType getRouteType() {
+         return null;
+      }
+
+      @Override
       public SimpleString getReplyTo() {
          return null;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 0f006bc..70ff658 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -243,27 +243,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
       assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
    }
 
-   @Test
-   public void testAmbiguousMessageRouting() throws Exception {
-      final String addressA = "addressA";
-      final String queueA = "queueA";
-      final String queueB = "queueB";
-      final String queueC = "queueC";
-      final String queueD = "queueD";
-
-      ActiveMQServerControl serverControl = server.getActiveMQServerControl();
-      serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString());
-      serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString());
-      serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString());
-
-      sendMessages(addressA, 1);
-
-      assertEquals(1, server.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount());
-      assertEquals(2, server.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + server.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount());
-   }
-
    @Test(timeout = 60000)
    public void testMessageDurableFalse() throws Exception {
       sendMessages(getTestName(), 1, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ade9b21/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 08d9787..042effd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
@@ -341,6 +342,11 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       final long id;
 
       @Override
+      public RoutingType getRouteType() {
+         return null;
+      }
+
+      @Override
       public SimpleString getReplyTo() {
          return null;
       }