You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2017/03/20 23:01:37 UTC

[1/3] activemq-artemis git commit: ARTEMIS-1052 adding a test for expiry and AMQP

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9385ce487 -> 150f67f86


ARTEMIS-1052 adding a test for expiry and AMQP

This closes #1106


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7ac27df7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7ac27df7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7ac27df7

Branch: refs/heads/master
Commit: 7ac27df7a0208714d70b920c1be9635bf0097eec
Parents: 9385ce4
Author: Jiri Danek <jd...@redhat.com>
Authored: Mon Mar 20 10:38:05 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 20 18:05:45 2017 -0400

----------------------------------------------------------------------
 .../amqp/SendingAndReceivingTest.java           | 43 ++++++++++++++++++++
 1 file changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7ac27df7/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
index 46e3b95..3c4e915 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
@@ -26,8 +27,11 @@ import javax.jms.TextMessage;
 import java.util.Random;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.After;
@@ -91,6 +95,45 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testSendMessageThenAllowToExpireUsingTimeToLive() throws Exception {
+      AddressSettings as = new AddressSettings();
+      as.setExpiryAddress(SimpleString.toSimpleString("DLQ"));
+      HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
+      repos.addMatch("exampleQueue", as);
+
+      Connection connection = null;
+      ConnectionFactory connectionFactory = new JmsConnectionFactory("amqp://localhost:61616");
+
+      try {
+         connection = connectionFactory.createConnection();
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         String address = "exampleQueue";
+         Queue queue = session.createQueue(address);
+
+         MessageProducer sender = session.createProducer(queue);
+         sender.setTimeToLive(10);
+
+         Message message = session.createMessage();
+         sender.send(message);
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(queue);
+         Message m = consumer.receive(5000);
+         Assert.assertNull(m);
+         consumer.close();
+
+         consumer = session.createConsumer(session.createQueue("DLQ"));
+         m = consumer.receive(5000);
+         Assert.assertNotNull(m);
+         consumer.close();
+      } finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
    private static String createMessage(int messageSize) {
       final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
       Random rnd = new Random();


[2/3] activemq-artemis git commit: ARTEMIS-1052 Proper Expiry over AMQP

Posted by ta...@apache.org.
ARTEMIS-1052 Proper Expiry over AMQP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/65ac7f70
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/65ac7f70
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/65ac7f70

Branch: refs/heads/master
Commit: 65ac7f700ba0a9409ecd345b77a991c5aedd53b9
Parents: 7ac27df
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Mar 20 12:24:42 2017 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 20 18:33:31 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  44 +++++--
 .../artemis/core/message/impl/CoreMessage.java  |   4 +-
 .../protocol/amqp/broker/AMQPMessage.java       | 105 +++++++++++----
 .../amqp/proton/ProtonServerSenderContext.java  |   2 +-
 .../core/protocol/openwire/OpenwireMessage.java |   4 +-
 .../impl/journal/LargeServerMessageImpl.java    |   2 +-
 .../core/postoffice/impl/BindingsImpl.java      |   4 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +-
 .../artemis/core/server/impl/DivertImpl.java    |   2 +
 .../artemis/core/server/impl/QueueImpl.java     |  18 ++-
 .../impl/ScheduledDeliveryHandlerTest.java      |   4 +-
 .../transport/amqp/client/AmqpMessage.java      |   2 +-
 .../integration/amqp/AmqpClientTestSupport.java |  88 +------------
 .../integration/amqp/AmqpNettyFailoverTest.java |   1 -
 .../tests/integration/amqp/AmqpTestSupport.java | 127 +++++++++++++++++++
 .../amqp/SendingAndReceivingTest.java           |  94 ++++++++++++--
 .../integration/client/AcknowledgeTest.java     |   4 +-
 17 files changed, 359 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index ec0a2db..56097ae 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -364,23 +364,30 @@ public interface Message {
       }
       setBuffer(null);
    }
+
+   default void reencode() {
+      // only valid probably on AMQP
+   }
+
    default void referenceOriginalMessage(final Message original, String originalQueue) {
-      String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString());
+      String queueOnMessage = original.getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
 
       if (queueOnMessage != null) {
-         putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), queueOnMessage);
+         setAnnotation(Message.HDR_ORIGINAL_QUEUE, queueOnMessage);
       } else if (originalQueue != null) {
-         putStringProperty(Message.HDR_ORIGINAL_QUEUE.toString(), originalQueue);
+         setAnnotation(Message.HDR_ORIGINAL_QUEUE, originalQueue);
       }
 
-      if (original.containsProperty(Message.HDR_ORIG_MESSAGE_ID.toString())) {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString()));
+      Object originalID = original.getAnnotation(Message.HDR_ORIG_MESSAGE_ID);
+
+      if (originalID != null) {
+         setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAnnotationString(Message.HDR_ORIGINAL_ADDRESS));
 
-         putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString()));
+         setAnnotation(Message.HDR_ORIG_MESSAGE_ID, originalID);
       } else {
-         putStringProperty(Message.HDR_ORIGINAL_ADDRESS.toString(), original.getAddress());
+         setAnnotation(Message.HDR_ORIGINAL_ADDRESS, original.getAddress());
 
-         putLongProperty(Message.HDR_ORIG_MESSAGE_ID.toString(), original.getMessageID());
+         setAnnotation(Message.HDR_ORIG_MESSAGE_ID, original.getMessageID());
       }
 
       // reset expiry
@@ -503,9 +510,26 @@ public interface Message {
 
    Object getObjectProperty(SimpleString key);
 
-   Object removeDeliveryAnnotationProperty(SimpleString key);
+   default Object removeAnnotation(SimpleString key) {
+      return removeProperty(key);
+   }
 
-   Object getDeliveryAnnotationProperty(SimpleString key);
+   default String getAnnotationString(SimpleString key) {
+      Object value = getAnnotation(key);
+      if (value != null) {
+         return value.toString();
+      } else {
+         return null;
+      }
+   }
+
+   Object getAnnotation(SimpleString key);
+
+   /** Callers must call {@link #reencode()} in order to be sent to clients */
+   default Message setAnnotation(SimpleString key, Object value) {
+      putObjectProperty(key, value);
+      return this;
+   }
 
    Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index ce1ea96..f0a8715 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -98,13 +98,13 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
 
    /** On core there's no delivery annotation */
    @Override
-   public Object getDeliveryAnnotationProperty(SimpleString key) {
+   public Object getAnnotation(SimpleString key) {
       return getObjectProperty(key);
    }
 
    /** On core there's no delivery annotation */
    @Override
-   public Object removeDeliveryAnnotationProperty(SimpleString key) {
+   public Object removeAnnotation(SimpleString key) {
       return removeProperty(key);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index c1c676c..e01d430 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -62,14 +63,14 @@ public class AMQPMessage extends RefCountMessage {
    final long messageFormat;
    ByteBuf data;
    boolean bufferValid;
-   byte type;
+   boolean durable;
    long messageID;
    String address;
    MessageImpl protonMessage;
    private volatile int memoryEstimate = -1;
    private long expiration = 0;
    // this is to store where to start sending bytes, ignoring header and delivery annotations.
-   private int sendFrom = -1;
+   private int sendFrom = 0;
    private boolean parsedHeaders = false;
    private Header _header;
    private DeliveryAnnotations _deliveryAnnotations;
@@ -123,7 +124,7 @@ public class AMQPMessage extends RefCountMessage {
    private void initalizeObjects() {
       if (protonMessage == null) {
          if (data == null) {
-            this.sendFrom = -1;
+            this.sendFrom = 0;
             _header = new Header();
             _deliveryAnnotations = new DeliveryAnnotations(new HashMap<>());
             _properties = new Properties();
@@ -220,12 +221,27 @@ public class AMQPMessage extends RefCountMessage {
       return null;
    }
 
+   private Object removeSymbol(Symbol symbol) {
+      MessageAnnotations annotations = getMessageAnnotations();
+      Map mapAnnotations = annotations != null ? annotations.getValue() : null;
+      if (mapAnnotations != null) {
+         return mapAnnotations.remove(symbol);
+      }
+
+      return null;
+   }
+
+
    private void setSymbol(String symbol, Object value) {
       setSymbol(Symbol.getSymbol(symbol), value);
    }
 
    private void setSymbol(Symbol symbol, Object value) {
       MessageAnnotations annotations = getMessageAnnotations();
+      if (annotations == null) {
+         _messageAnnotations = new MessageAnnotations(new HashMap<>());
+         annotations = _messageAnnotations;
+      }
       Map mapAnnotations = annotations != null ? annotations.getValue() : null;
       if (mapAnnotations != null) {
          mapAnnotations.put(symbol, value);
@@ -408,7 +424,14 @@ public class AMQPMessage extends RefCountMessage {
    @Override
    public org.apache.activemq.artemis.api.core.Message copy() {
       checkBuffer();
-      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array());
+
+      byte[] origin = data.array();
+      byte[] newData = new byte[data.array().length - sendFrom];
+      for (int i = 0; i < newData.length; i++) {
+         newData[i] = origin[i + sendFrom];
+      }
+      AMQPMessage newEncode = new AMQPMessage(this.messageFormat, newData);
+      newEncode.setDurable(isDurable());
       return newEncode;
    }
 
@@ -436,6 +459,16 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public AMQPMessage setExpiration(long expiration) {
+
+      Properties properties = getProperties();
+
+      if (properties != null) {
+         if (expiration <= 0) {
+            properties.setAbsoluteExpiryTime(null);
+         } else {
+            properties.setAbsoluteExpiryTime(new Date(expiration));
+         }
+      }
       this.expiration = expiration;
       return this;
    }
@@ -460,7 +493,7 @@ public class AMQPMessage extends RefCountMessage {
       if (getHeader() != null && getHeader().getDurable() != null) {
          return getHeader().getDurable().booleanValue();
       } else {
-         return false;
+         return durable;
       }
    }
 
@@ -471,7 +504,8 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public org.apache.activemq.artemis.api.core.Message setDurable(boolean durable) {
-      return null;
+      this.durable = durable;
+      return this;
    }
 
    @Override
@@ -544,11 +578,19 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
+   public int getEncodeSize() {
+      checkBuffer();
+      // + 20checkBuffer is an estimate for the Header with the deliveryCount
+      return data.array().length - sendFrom + 20;
+   }
+
+   @Override
    public void sendBuffer(ByteBuf buffer, int deliveryCount) {
       checkBuffer();
       Header header = getHeader();
       if (header == null && deliveryCount > 0) {
          header = new Header();
+         header.setDurable(durable);
       }
       if (header != null) {
          synchronized (header) {
@@ -756,20 +798,37 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public Object removeDeliveryAnnotationProperty(SimpleString key) {
-      parseHeaders();
-      if (_deliveryAnnotations == null || _deliveryAnnotations.getValue() == null) {
-         return null;
-      }
-      return _deliveryAnnotations.getValue().remove(key.toString());
+   public Object removeAnnotation(SimpleString key) {
+      return removeSymbol(Symbol.getSymbol(key.toString()));
    }
 
    @Override
-   public Object getDeliveryAnnotationProperty(SimpleString key) {
-      return null;
+   public Object getAnnotation(SimpleString key) {
+      return getSymbol(key.toString());
    }
 
    @Override
+   public AMQPMessage setAnnotation(SimpleString key, Object value) {
+      setSymbol(key.toString(), value);
+      return this;
+   }
+
+
+   @Override
+   public void reencode() {
+      parseHeaders();
+      ApplicationProperties properties = getApplicationProperties();
+      getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
+      getProtonMessage().setMessageAnnotations(_messageAnnotations);
+      getProtonMessage().setApplicationProperties(properties);
+      getProtonMessage().setProperties(this._properties);
+      bufferValid = false;
+      checkBuffer();
+   }
+
+
+
+   @Override
    public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
       return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
    }
@@ -850,11 +909,6 @@ public class AMQPMessage extends RefCountMessage {
    }
 
    @Override
-   public int getEncodeSize() {
-      return 0;
-   }
-
-   @Override
    public Set<SimpleString> getPropertyNames() {
       HashSet<SimpleString> values = new HashSet<>();
       for (Object k : getApplicationPropertiesMap().keySet()) {
@@ -901,15 +955,18 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public int getPersistSize() {
-      checkBuffer();
-      return data.array().length + DataConstants.SIZE_INT;
+      return DataConstants.SIZE_INT + internalPersistSize();
+   }
+
+   private int internalPersistSize() {
+      return data.array().length - sendFrom;
    }
 
    @Override
    public void persist(ActiveMQBuffer targetRecord) {
       checkBuffer();
-      targetRecord.writeInt(data.array().length);
-      targetRecord.writeBytes(data.array());
+      targetRecord.writeInt(internalPersistSize());
+      targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom);
    }
 
    @Override
@@ -917,8 +974,10 @@ public class AMQPMessage extends RefCountMessage {
       int size = record.readInt();
       byte[] recordArray = new byte[size];
       record.readBytes(recordArray);
+      this.sendFrom = 0; // whatever was persisted will be sent
       this.data = Unpooled.wrappedBuffer(recordArray);
       this.bufferValid = true;
+      this.durable = true; // it's coming from the journal, so it's durable
       parseHeaders();
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 5a97c02..d24464c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -594,7 +594,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       // we only need a tag if we are going to settle later
       byte[] tag = preSettle ? new byte[0] : protonSession.getTag();
 
-      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+      ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(message.getEncodeSize());
       try {
          message.sendBuffer(nettyBuffer, deliveryCount);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
index 9fb6eb9..3bd95f4 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenwireMessage.java
@@ -58,12 +58,12 @@ public class OpenwireMessage implements Message {
    }
 
    @Override
-   public Object removeDeliveryAnnotationProperty(SimpleString key) {
+   public Object removeAnnotation(SimpleString key) {
       return null;
    }
 
    @Override
-   public Object getDeliveryAnnotationProperty(SimpleString key) {
+   public Object getAnnotation(SimpleString key) {
       return null;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
index 46bd335..2c3e01c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/LargeServerMessageImpl.java
@@ -242,7 +242,7 @@ public final class LargeServerMessageImpl extends CoreMessage implements LargeSe
          LargeServerMessageImpl otherLM = (LargeServerMessageImpl) original;
          this.paged = otherLM.paged;
          if (this.paged) {
-            this.removeProperty(Message.HDR_ORIG_MESSAGE_ID);
+            this.removeAnnotation(Message.HDR_ORIG_MESSAGE_ID);
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index c7b6024..377223b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -238,7 +238,7 @@ public final class BindingsImpl implements Bindings {
       /* This is a special treatment for scaled-down messages involving SnF queues.
        * See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
        */
-      byte[] ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_SCALEDOWN_TO_IDS);
+      byte[] ids = (byte[]) message.removeAnnotation(Message.HDR_SCALEDOWN_TO_IDS);
 
       if (ids != null) {
          ByteBuffer buffer = ByteBuffer.wrap(ids);
@@ -268,7 +268,7 @@ public final class BindingsImpl implements Bindings {
 
       if (!routed) {
          // Remove the ids now, in order to avoid double check
-         ids = (byte[]) message.removeDeliveryAnnotationProperty(Message.HDR_ROUTE_TO_IDS);
+         ids = (byte[]) message.removeAnnotation(Message.HDR_ROUTE_TO_IDS);
 
          // Fetch the groupId now, in order to avoid double checking
          SimpleString groupId = message.getGroupID();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 1f51210..2ef7657 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -765,6 +765,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
                message.setAddress(dlaAddress);
 
+               message.reencode();
+
                route(message, context.getTransaction(), false);
                result = RoutingStatus.NO_BINDINGS_DLA;
             }
@@ -1221,7 +1223,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                     AtomicBoolean startedTX) throws Exception {
       // Check the DuplicateCache for the Bridge first
 
-      Object bridgeDup = message.removeDeliveryAnnotationProperty(Message.HDR_BRIDGE_DUPLICATE_ID);
+      Object bridgeDup = message.removeAnnotation(Message.HDR_BRIDGE_DUPLICATE_ID);
       if (bridgeDup != null) {
          // if the message is being sent from the bridge, we just ignore the duplicate id, and use the internal one
          byte[] bridgeDupBytes = (byte[]) bridgeDup;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 5b0d406..c73fd80 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -104,6 +104,8 @@ public class DivertImpl implements Divert {
 
          copy.setExpiration(message.getExpiration());
 
+         copy.reencode();
+
          switch (routingType) {
             case ANYCAST:
                copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index fc655f6..406ba5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1718,14 +1718,14 @@ public class QueueImpl implements Queue {
    @Override
    public int retryMessages(Filter filter) throws Exception {
 
-      final HashMap<SimpleString, Long> queues = new HashMap<>();
+      final HashMap<String, Long> queues = new HashMap<>();
 
       return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws Exception {
 
-            SimpleString originalMessageAddress = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_ADDRESS);
-            SimpleString originalMessageQueue = ref.getMessage().getSimpleStringProperty(Message.HDR_ORIGINAL_QUEUE);
+            String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
+            String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
 
             if (originalMessageAddress != null) {
 
@@ -1735,7 +1735,7 @@ public class QueueImpl implements Queue {
                if (originalMessageQueue != null && !originalMessageQueue.equals(originalMessageAddress)) {
                   targetQueue = queues.get(originalMessageQueue);
                   if (targetQueue == null) {
-                     Binding binding = postOffice.getBinding(originalMessageQueue);
+                     Binding binding = postOffice.getBinding(SimpleString.toSimpleString(originalMessageQueue));
 
                      if (binding != null && binding instanceof LocalQueueBinding) {
                         targetQueue = ((LocalQueueBinding) binding).getID();
@@ -1745,9 +1745,9 @@ public class QueueImpl implements Queue {
                }
 
                if (targetQueue != null) {
-                  move(originalMessageAddress, tx, ref, false, false, targetQueue.longValue());
+                  move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue());
                } else {
-                  move(originalMessageAddress, tx, ref, false, false);
+                  move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false);
 
                }
 
@@ -2495,10 +2495,14 @@ public class QueueImpl implements Queue {
          copy.referenceOriginalMessage(message, ref != null ? ref.getQueue().getName().toString() : null);
       }
 
+      copy.setExpiration(0);
+
       if (expiry) {
-         copy.putLongProperty(Message.HDR_ACTUAL_EXPIRY_TIME.toString(), System.currentTimeMillis());
+         copy.setAnnotation(Message.HDR_ACTUAL_EXPIRY_TIME, System.currentTimeMillis());
       }
 
+      copy.reencode();
+
       return copy;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 8d06286..dd48b58 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -299,12 +299,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public Object removeDeliveryAnnotationProperty(SimpleString key) {
+      public Object removeAnnotation(SimpleString key) {
          return null;
       }
 
       @Override
-      public Object getDeliveryAnnotationProperty(SimpleString key) {
+      public Object getAnnotation(SimpleString key) {
          return null;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 8599fa9..bf9e0b5 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -471,7 +471,7 @@ public class AmqpMessage {
    /**
     * Sets the creation time property on the message.
     *
-    * @param absoluteExpiryTime the expiration time value to set.
+    * @param creationTime the time value to set.
     */
    public void setCreationTime(long creationTime) {
       checkReadOnly();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index d9b45d3..99ce4db 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -17,8 +17,6 @@
 
 package org.apache.activemq.artemis.tests.integration.amqp;
 
-import java.net.URI;
-import java.util.LinkedList;
 import java.util.Set;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -32,7 +30,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
@@ -46,23 +43,14 @@ import org.junit.Before;
  * Test support class for tests that will be using the AMQP Proton wrapper client.
  * This is to make it easier to migrate tests from ActiveMQ5
  */
-public class AmqpClientTestSupport extends ActiveMQTestBase {
+public class AmqpClientTestSupport extends AmqpTestSupport {
 
    protected static Symbol SHARED = Symbol.getSymbol("shared");
    protected static Symbol GLOBAL = Symbol.getSymbol("global");
 
 
-   private boolean useSSL;
-
    protected JMSServerManager serverManager;
    protected ActiveMQServer server;
-   protected LinkedList<AmqpConnection> connections = new LinkedList<>();
-
-   protected AmqpConnection addConnection(AmqpConnection connection) {
-      connections.add(connection);
-      return connection;
-   }
-
    @Before
    @Override
    public void setUp() throws Exception {
@@ -80,6 +68,7 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
             ignored.printStackTrace();
          }
       }
+      connections.clear();
 
       if (serverManager != null) {
          try {
@@ -149,79 +138,6 @@ public class AmqpClientTestSupport extends ActiveMQTestBase {
       this.useSSL = useSSL;
    }
 
-   public boolean isUseSSL() {
-      return useSSL;
-   }
-
-   public String getAmqpConnectionURIOptions() {
-      return "";
-   }
-
-   public URI getBrokerAmqpConnectionURI() {
-      boolean webSocket = false;
-
-      try {
-         int port = 61616;
-
-         String uri = null;
-
-         if (isUseSSL()) {
-            if (webSocket) {
-               uri = "wss://127.0.0.1:" + port;
-            } else {
-               uri = "ssl://127.0.0.1:" + port;
-            }
-         } else {
-            if (webSocket) {
-               uri = "ws://127.0.0.1:" + port;
-            } else {
-               uri = "tcp://127.0.0.1:" + port;
-            }
-         }
-
-         if (!getAmqpConnectionURIOptions().isEmpty()) {
-            uri = uri + "?" + getAmqpConnectionURIOptions();
-         }
-
-         return new URI(uri);
-      } catch (Exception e) {
-         throw new RuntimeException();
-      }
-   }
-
-   public AmqpConnection createAmqpConnection() throws Exception {
-      return createAmqpConnection(getBrokerAmqpConnectionURI());
-   }
-
-   public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
-      return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
-   }
-
-   public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
-      return createAmqpConnection(brokerURI, null, null);
-   }
-
-   public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
-      return createAmqpClient(brokerURI, username, password).connect();
-   }
-
-   public AmqpClient createAmqpClient() throws Exception {
-      return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
-   }
-
-   public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
-      return createAmqpClient(brokerURI, null, null);
-   }
-
-   public AmqpClient createAmqpClient(String username, String password) throws Exception {
-      return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
-   }
-
-   public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
-      return new AmqpClient(brokerURI, username, password);
-   }
-
-
    protected void sendMessages(int numMessages, String address) throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
index 5496883..5fb4e35 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest;
 import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Assert;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
new file mode 100644
index 0000000..8f89452
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpTestSupport.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.net.URI;
+import java.util.LinkedList;
+
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.junit.After;
+
+/** This will only add methods to support AMQP Testing without creating servers or anything */
+public class AmqpTestSupport extends ActiveMQTestBase {
+   protected LinkedList<AmqpConnection> connections = new LinkedList<>();
+
+   protected boolean useSSL;
+
+   protected AmqpConnection addConnection(AmqpConnection connection) {
+      connections.add(connection);
+      return connection;
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception {
+      for (AmqpConnection conn : connections) {
+         try {
+            conn.close();
+         } catch (Throwable ignored) {
+            ignored.printStackTrace();
+         }
+      }
+
+      super.tearDown();
+   }
+
+   public boolean isUseSSL() {
+      return useSSL;
+   }
+
+   public String getAmqpConnectionURIOptions() {
+      return "";
+   }
+
+   public URI getBrokerAmqpConnectionURI() {
+      boolean webSocket = false;
+
+      try {
+         int port = 61616;
+
+         String uri = null;
+
+         if (isUseSSL()) {
+            if (webSocket) {
+               uri = "wss://127.0.0.1:" + port;
+            } else {
+               uri = "ssl://127.0.0.1:" + port;
+            }
+         } else {
+            if (webSocket) {
+               uri = "ws://127.0.0.1:" + port;
+            } else {
+               uri = "tcp://127.0.0.1:" + port;
+            }
+         }
+
+         if (!getAmqpConnectionURIOptions().isEmpty()) {
+            uri = uri + "?" + getAmqpConnectionURIOptions();
+         }
+
+         return new URI(uri);
+      } catch (Exception e) {
+         throw new RuntimeException();
+      }
+   }
+
+   public AmqpConnection createAmqpConnection() throws Exception {
+      return createAmqpConnection(getBrokerAmqpConnectionURI());
+   }
+
+   public AmqpConnection createAmqpConnection(String username, String password) throws Exception {
+      return createAmqpConnection(getBrokerAmqpConnectionURI(), username, password);
+   }
+
+   public AmqpConnection createAmqpConnection(URI brokerURI) throws Exception {
+      return createAmqpConnection(brokerURI, null, null);
+   }
+
+   public AmqpConnection createAmqpConnection(URI brokerURI, String username, String password) throws Exception {
+      return createAmqpClient(brokerURI, username, password).connect();
+   }
+
+   public AmqpClient createAmqpClient() throws Exception {
+      return createAmqpClient(getBrokerAmqpConnectionURI(), null, null);
+   }
+
+   public AmqpClient createAmqpClient(URI brokerURI) throws Exception {
+      return createAmqpClient(brokerURI, null, null);
+   }
+
+   public AmqpClient createAmqpClient(String username, String password) throws Exception {
+      return createAmqpClient(getBrokerAmqpConnectionURI(), username, password);
+   }
+
+   public AmqpClient createAmqpClient(URI brokerURI, String username, String password) throws Exception {
+      return new AmqpClient(brokerURI, username, password);
+   }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
index 3c4e915..87a4710 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/SendingAndReceivingTest.java
@@ -26,20 +26,28 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-public class SendingAndReceivingTest extends ActiveMQTestBase {
+public class SendingAndReceivingTest extends AmqpTestSupport {
 
    private ActiveMQServer server;
 
@@ -55,7 +63,15 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
             tc.getExtraParams().put("multicastPrefix", "multicast://");
          }
       }
+      server.getConfiguration().setMessageExpiryScanPeriod(1);
       server.start();
+      server.createQueue(SimpleString.toSimpleString("exampleQueue"), RoutingType.ANYCAST, SimpleString.toSimpleString("exampleQueue"), null, true, false, -1, false, true);
+      server.createQueue(SimpleString.toSimpleString("DLQ"), RoutingType.ANYCAST, SimpleString.toSimpleString("DLQ"), null, true, false, -1, false, true);
+
+      AddressSettings as = new AddressSettings();
+      as.setExpiryAddress(SimpleString.toSimpleString("DLQ"));
+      HierarchicalRepository<AddressSettings> repos = server.getAddressSettingsRepository();
+      repos.addMatch("#", as);
    }
 
    @After
@@ -112,21 +128,24 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
          Queue queue = session.createQueue(address);
 
          MessageProducer sender = session.createProducer(queue);
-         sender.setTimeToLive(10);
+         sender.setTimeToLive(1);
 
          Message message = session.createMessage();
          sender.send(message);
          connection.start();
 
-         MessageConsumer consumer = session.createConsumer(queue);
-         Message m = consumer.receive(5000);
-         Assert.assertNull(m);
+         MessageConsumer consumer = session.createConsumer(session.createQueue("DLQ"));
+         Message m = consumer.receive(10000);
+         Assert.assertNotNull(m);
          consumer.close();
 
-         consumer = session.createConsumer(session.createQueue("DLQ"));
-         m = consumer.receive(5000);
-         Assert.assertNotNull(m);
+
+         consumer = session.createConsumer(queue);
+         m = consumer.receiveNoWait();
+         Assert.assertNull(m);
          consumer.close();
+
+
       } finally {
          if (connection != null) {
             connection.close();
@@ -134,6 +153,63 @@ public class SendingAndReceivingTest extends ActiveMQTestBase {
       }
    }
 
+   @Test(timeout = 60000)
+   public void testSendExpiry() throws Throwable {
+      internalSendExpiry(false);
+   }
+
+   @Test(timeout = 60000)
+   public void testSendExpiryRestartServer() throws Throwable {
+      internalSendExpiry(true);
+   }
+
+   public void internalSendExpiry(boolean restartServer) throws Throwable {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = client.connect();
+
+      try {
+
+         // Normal Session which won't create an TXN itself
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender("exampleQueue");
+
+         AmqpMessage message = new AmqpMessage();
+         message.setDurable(true);
+         message.setText("Test-Message");
+         message.setDeliveryAnnotation("shouldDisappear", 1);
+         message.setAbsoluteExpiryTime(System.currentTimeMillis() + 1000);
+         sender.send(message);
+
+         org.apache.activemq.artemis.core.server.Queue dlq = server.locateQueue(SimpleString.toSimpleString("DLQ"));
+
+         Wait.waitFor(() -> dlq.getMessageCount() > 0, 5000, 500);
+
+         connection.close();
+
+         if (restartServer) {
+            server.stop();
+            server.start();
+         }
+
+         connection = client.connect();
+         session = connection.createSession();
+
+         // Read all messages from the Queue, do not accept them yet.
+         AmqpReceiver receiver = session.createReceiver("DLQ");
+         receiver.flow(20);
+         message = receiver.receive(5, TimeUnit.SECONDS);
+         Assert.assertNotNull(message);
+         Assert.assertEquals("exampleQueue", message.getMessageAnnotation(org.apache.activemq.artemis.api.core.Message.HDR_ORIGINAL_ADDRESS.toString()));
+         Assert.assertNull(message.getDeliveryAnnotation("shouldDisappear"));
+         Assert.assertNull(receiver.receiveNoWait());
+
+      } finally {
+         connection.close();
+      }
+   }
+
+
+
    private static String createMessage(int messageSize) {
       final String AB = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
       Random rnd = new Random();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/65ac7f70/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
index 43dad84..31e26e3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java
@@ -357,12 +357,12 @@ public class AcknowledgeTest extends ActiveMQTestBase {
       }
 
       @Override
-      public Object removeDeliveryAnnotationProperty(SimpleString key) {
+      public Object removeAnnotation(SimpleString key) {
          return null;
       }
 
       @Override
-      public Object getDeliveryAnnotationProperty(SimpleString key) {
+      public Object getAnnotation(SimpleString key) {
          return null;
       }
 


[3/3] activemq-artemis git commit: This closes #1110

Posted by ta...@apache.org.
This closes #1110


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/150f67f8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/150f67f8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/150f67f8

Branch: refs/heads/master
Commit: 150f67f8622cb8b67e7b68e327db387178f6ad70
Parents: 9385ce4 65ac7f7
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Mar 20 19:00:59 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Mar 20 19:00:59 2017 -0400

----------------------------------------------------------------------
 .../activemq/artemis/api/core/Message.java      |  44 +++++--
 .../artemis/core/message/impl/CoreMessage.java  |   4 +-
 .../protocol/amqp/broker/AMQPMessage.java       | 105 +++++++++++----
 .../amqp/proton/ProtonServerSenderContext.java  |   2 +-
 .../core/protocol/openwire/OpenwireMessage.java |   4 +-
 .../impl/journal/LargeServerMessageImpl.java    |   2 +-
 .../core/postoffice/impl/BindingsImpl.java      |   4 +-
 .../core/postoffice/impl/PostOfficeImpl.java    |   4 +-
 .../artemis/core/server/impl/DivertImpl.java    |   2 +
 .../artemis/core/server/impl/QueueImpl.java     |  18 ++-
 .../impl/ScheduledDeliveryHandlerTest.java      |   4 +-
 .../transport/amqp/client/AmqpMessage.java      |   2 +-
 .../integration/amqp/AmqpClientTestSupport.java |  88 +------------
 .../integration/amqp/AmqpNettyFailoverTest.java |   1 -
 .../tests/integration/amqp/AmqpTestSupport.java | 127 +++++++++++++++++++
 .../amqp/SendingAndReceivingTest.java           | 123 +++++++++++++++++-
 .../integration/client/AcknowledgeTest.java     |   4 +-
 17 files changed, 395 insertions(+), 143 deletions(-)
----------------------------------------------------------------------