You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2017/03/09 14:13:01 UTC

[2/3] activemq-artemis git commit: ARTEMIS-1009 Tight up on AMQP

ARTEMIS-1009 Tight up on AMQP

avoiding application properties even further
fixing nolocal


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c3690843
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c3690843
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c3690843

Branch: refs/heads/master
Commit: c36908438178e193392c1639067d281ce895b914
Parents: 8aa55a6
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Mar 8 16:11:35 2017 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Mar 9 08:12:42 2017 -0600

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      | 19 ++++++++-
 .../artemis/core/message/impl/CoreMessage.java  |  7 +++-
 .../protocol/amqp/broker/AMQPMessage.java       | 42 ++++++++++++++++----
 .../amqp/broker/AMQPSessionCallback.java        | 11 +----
 .../amqp/proton/ProtonServerSenderContext.java  |  6 +--
 .../openwire/OpenWireMessageConverter.java      |  3 --
 .../core/protocol/openwire/OpenwireMessage.java |  2 +-
 .../core/postoffice/impl/BindingsImpl.java      | 28 ++++++-------
 .../core/postoffice/impl/PostOfficeImpl.java    |  6 +--
 .../core/server/cluster/impl/Redistributor.java |  2 +-
 .../artemis/core/server/impl/QueueImpl.java     |  2 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  2 +-
 .../integration/client/AcknowledgeTest.java     |  2 +-
 13 files changed, 84 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index c7a831b..5fd0778 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -167,7 +167,9 @@ public interface Message {
 
    default SimpleString getDeliveryAnnotationPropertyString(SimpleString property) {
       Object obj = getDeliveryAnnotationProperty(property);
-      if (obj instanceof SimpleString) {
+      if (obj == null) {
+         return null;
+      } else if (obj instanceof SimpleString) {
          return (SimpleString)obj;
       } else {
          return SimpleString.toSimpleString(obj.toString());
@@ -232,6 +234,10 @@ public interface Message {
     * */
    RefCountMessageListener getContext();
 
+   default SimpleString getGroupID() {
+      return null;
+   }
+
    SimpleString getReplyTo();
 
    Message setReplyTo(SimpleString address);
@@ -256,6 +262,15 @@ public interface Message {
     */
    long getMessageID();
 
+   // used for NO-LOCAL: mainly for AMQP
+   default Message setConnectionID(String connectionID) {
+      return this;
+   }
+
+   default String getConnectionID() {
+      return null;
+   }
+
    Message setMessageID(long id);
 
    default boolean isLargeMessage() {
@@ -503,7 +518,7 @@ public interface Message {
 
    Object getObjectProperty(SimpleString key);
 
-   Object removeDeliveryAnnoationProperty(SimpleString key);
+   Object removeDeliveryAnnotationProperty(SimpleString key);
 
    Object getDeliveryAnnotationProperty(SimpleString key);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index e8d7cc5..8d15384 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -104,7 +104,7 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    /** On core there's no delivery annotation */
    @Override
-   public Object removeDeliveryAnnoationProperty(SimpleString key) {
+   public Object removeDeliveryAnnotationProperty(SimpleString key) {
       return removeProperty(key);
    }
 
@@ -194,6 +194,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
       return new ChannelBufferWrapper(buffer.slice(BODY_OFFSET, endOfBodyPosition - BUFFER_HEADER_SPACE).setIndex(0, endOfBodyPosition - BUFFER_HEADER_SPACE).asReadOnly());
    }
 
+   @Override
+   public SimpleString getGroupID() {
+      return this.getSimpleStringProperty(Message.HDR_GROUP_ID);
+   }
+
    /**
     *
     * @param sendBuffer

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 1859dcf..24175d2 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -37,6 +37,7 @@ import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter;
 import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
@@ -72,6 +73,7 @@ public class AMQPMessage extends RefCountMessage {
    private Properties _properties;
    private ApplicationProperties applicationProperties;
    private long scheduledTime = -1;
+   private String connectionID;
 
    public AMQPMessage(long messageFormat, byte[] data) {
       this.data = Unpooled.wrappedBuffer(data);
@@ -128,7 +130,6 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    private Map getApplicationPropertiesMap() {
-
       ApplicationProperties appMap = getApplicationProperties();
       Map map = null;
 
@@ -158,6 +159,17 @@ public class AMQPMessage extends RefCountMessage {
          parsedHeaders = true;
       }
    }
+   @Override
+   public org.apache.activemq.artemis.api.core.Message setConnectionID(String connectionID) {
+      this.connectionID = connectionID;
+      return this;
+   }
+
+   @Override
+   public String getConnectionID() {
+      return connectionID;
+   }
+
 
    public MessageAnnotations getMessageAnnotations() {
       parseHeaders();
@@ -222,6 +234,17 @@ public class AMQPMessage extends RefCountMessage {
    }
 
 
+   @Override
+   public SimpleString getGroupID() {
+      parseHeaders();
+
+      if (_properties != null && _properties.getGroupId() != null) {
+         return SimpleString.toSimpleString(_properties.getGroupId());
+      } else {
+         return null;
+      }
+   }
+
 
    @Override
    public Long getScheduledDeliveryTime() {
@@ -667,11 +690,13 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public Object getObjectProperty(String key) {
-      if (key.equals("JMSType")) {
+      if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
          return getProperties().getSubject();
+      } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
+         return getConnectionID();
+      } else {
+         return getApplicationPropertiesMap().get(key);
       }
-
-      return getApplicationPropertiesMap().get(key);
    }
 
    @Override
@@ -686,10 +711,13 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public String getStringProperty(String key) throws ActiveMQPropertyConversionException {
-      if (key.equals("JMSType")) {
+      if (key.equals(MessageUtil.TYPE_HEADER_NAME.toString())) {
          return getProperties().getSubject();
+      } else if (key.equals(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString())) {
+         return getConnectionID();
+      } else {
+         return (String)getApplicationPropertiesMap().get(key);
       }
-      return (String)getApplicationPropertiesMap().get(key);
    }
 
    @Override
@@ -702,7 +730,7 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public Object removeDeliveryAnnoationProperty(SimpleString key) {
+   public Object removeDeliveryAnnotationProperty(SimpleString key) {
       parseHeaders();
       if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
          return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index a079190..fb355ff 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
@@ -394,10 +393,10 @@ public class AMQPSessionCallback implements SessionCallback {
                            final Receiver receiver) throws Exception {
       try {
 
-//         message.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString(), receiver.getSession().getConnection().getRemoteContainer());
+         message.setConnectionID(receiver.getSession().getConnection().getRemoteContainer());
+
          serverSession.send(transaction, message, false, false);
 
-         // FIXME Potential race here...
          manager.getServer().getStorageManager().afterCompleteOperations(new IOCallback() {
             @Override
             public void done() {
@@ -421,10 +420,6 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
-   public String getPubSubPrefix() {
-      return manager.getPubSubPrefix();
-   }
-
    public void offerProducerCredit(final String address,
                                    final int credits,
                                    final int threshold,
@@ -482,8 +477,6 @@ public class AMQPSessionCallback implements SessionCallback {
    @Override
    public int sendMessage(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
 
-      message.removeProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString());
-
       ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext();
 
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index fcbd47e..a2a8eda 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -32,7 +32,6 @@ import org.apache.activemq.artemis.core.postoffice.impl.CompositeAddress;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
-import org.apache.activemq.artemis.jms.client.ActiveMQConnection;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
@@ -44,6 +43,7 @@ import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResource
 import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionImpl;
 import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore;
+import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.selector.filter.FilterException;
 import org.apache.activemq.artemis.selector.impl.SelectorParser;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -193,7 +193,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
                boolean noLocal = false;
 
                String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-               String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+               String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
 
                if (selector.endsWith(noLocalFilter)) {
                   if (selector.length() > noLocalFilter.length()) {
@@ -283,7 +283,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
             Map.Entry<Symbol, DescribedType> filter = AmqpSupport.findFilter(source.getFilter(), AmqpSupport.NO_LOCAL_FILTER_IDS);
             if (filter != null) {
                String remoteContainerId = sender.getSession().getConnection().getRemoteContainer();
-               String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
+               String noLocalFilter = MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'";
                if (selector != null) {
                   selector += " AND " + noLocalFilter;
                } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3808363..f578ac8 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -55,7 +55,6 @@ import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.DataStructure;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageDispatch;
@@ -374,8 +373,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
          coreMessage.putBytesProperty(AMQ_MSG_REPLY_TO, replyToBytes.data);
       }
 
-      ConsumerId consumerId = messageSend.getTargetConsumerId();
-
       String userId = messageSend.getUserID();
       if (userId != null) {
          coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 5793d58..2504a71 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -63,7 +63,7 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public Object removeDeliveryAnnoationProperty(SimpleString key) {
+   public Object removeDeliveryAnnotationProperty(SimpleString key) {
       return null;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 2de97e2..c7b6024 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -238,19 +238,17 @@ public final class BindingsImpl implements Bindings {
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
-      if (message.containsProperty(Message.HDR_SCALEDOWN_TO_IDS)) {
-         byte[] ids = (byte[]) message.removeProperty(Message.HDR_SCALEDOWN_TO_IDS);
-
-         if (ids != null) {
-            ByteBuffer buffer = ByteBuffer.wrap(ids);
-            while (buffer.hasRemaining()) {
-               long id = buffer.getLong();
-               for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) {
-                  if (entry.getValue() instanceof RemoteQueueBinding) {
-                     RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
-                     if (remoteQueueBinding.getRemoteQueueID() == id) {
-                        message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
-                     }
+      byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS);
+
+      if (ids != null) {
+         ByteBuffer buffer = ByteBuffer.wrap(ids);
+         while (buffer.hasRemaining()) {
+            long id = buffer.getLong();
+            for (Map.Entry<Long, Binding> entry : bindingsMap.entrySet()) {
+               if (entry.getValue() instanceof RemoteQueueBinding) {
+                  RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
+                  if (remoteQueueBinding.getRemoteQueueID() == id) {
+                     message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
                   }
                }
             }
@@ -270,10 +268,10 @@ public final class BindingsImpl implements Bindings {
 
       if (!routed) {
          // Remove the ids now, in order to avoid double check
-         byte[] ids = (byte[]) message.removeProperty(Message.HDR_ROUTE_TO_IDS);
+         ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS);
 
          // Fetch the groupId now, in order to avoid double checking
-         SimpleString groupId = message.getSimpleStringProperty(Message.HDR_GROUP_ID);
+         SimpleString groupId = message.getGroupID();
 
          if (ids != null) {
             routeFromCluster(message, context, ids);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 464859f..e2e7b24 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -820,8 +820,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       MessageReference reference = MessageReference.Factory.createReference(message, queue);
 
-      if (message.containsProperty(Message.HDR_SCHEDULED_DELIVERY_TIME)) {
-         Long scheduledDeliveryTime = message.getLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+      Long scheduledDeliveryTime = message.getScheduledDeliveryTime();
+      if (scheduledDeliveryTime != null) {
          reference.setScheduledDeliveryTime(scheduledDeliveryTime);
       }
 
@@ -1220,7 +1220,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                     AtomicBoolean startedTX) throws Exception {
       // Check the DuplicateCache for the Bridge first
 
-      Object bridgeDup = message.removeDeliveryAnnoationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
+      Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
       if (bridgeDup != null) {
          // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
          byte[] bridgeDupBytes = (byte[]) bridgeDup;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index e9477a8..26399dc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -131,7 +131,7 @@ public class Redistributor implements Consumer {
    public synchronized HandleStatus handle(final MessageReference reference) throws Exception {
       if (!active) {
          return HandleStatus.BUSY;
-      } else if (reference.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID) != null) {
+      } else if (reference.getMessage().getGroupID() != null) {
          //we shouldn't redistribute with message groups return NO_MATCH so other messages can be delivered
          return HandleStatus.NO_MATCH;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f0f8e97..fa111eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2162,7 +2162,7 @@ public class QueueImpl implements Queue {
       } else {
          try {
             // But we don't use the groupID on internal queues (clustered queues) otherwise the group map would leak forever
-            return ref.getMessage().getSimpleStringProperty(Message.HDR_GROUP_ID);
+            return ref.getMessage().getGroupID();
          } catch (Throwable e) {
             ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
             return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 1569822..2a604e7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -304,7 +304,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public Object removeDeliveryAnnoationProperty(SimpleString key) {
+      public Object removeDeliveryAnnotationProperty(SimpleString key) {
          return null;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c3690843/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index d93807f..7181c61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -362,7 +362,7 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Object removeDeliveryAnnoationProperty(SimpleString key) {
+      public Object removeDeliveryAnnotationProperty(SimpleString key) {
          return null;
       }