You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2021/07/06 19:09:37 UTC

[activemq-artemis] branch main updated: ARTEMIS-2919 support timestamping incoming messages

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new b7f9807  ARTEMIS-2919 support timestamping incoming messages
     new b5f772e  This closes #3279
b7f9807 is described below

commit b7f9807cd97812e7827abe3703dca7a1c74fc87f
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Tue Dec 1 11:02:25 2020 -0600

    ARTEMIS-2919 support timestamping incoming messages
---
 .../apache/activemq/artemis/api/core/Message.java  |  14 ++
 .../protocol/amqp/broker/AMQPLargeMessage.java     |   1 +
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  19 ++
 .../amqp/broker/AMQPMessageBrokerAccessor.java     |   5 +
 .../amqp/converter/AMQPMessageSupport.java         |   3 +
 .../protocol/amqp/converter/AmqpCoreConverter.java |   3 +
 .../protocol/amqp/converter/CoreAmqpConverter.java |   3 +
 .../amqp/proton/ProtonServerSenderContext.java     |  15 +-
 .../openwire/OpenWireMessageConverter.java         |  14 ++
 .../artemis/core/protocol/stomp/Stomp.java         |   2 +
 .../artemis/core/protocol/stomp/StompUtils.java    |   3 +
 .../deployers/impl/FileConfigurationParser.java    |   3 +
 .../core/server/impl/ServerSessionImpl.java        |   5 +
 .../core/settings/impl/AddressSettings.java        |  35 +++-
 .../resources/schema/artemis-configuration.xsd     |   8 +
 .../core/config/impl/FileConfigurationTest.java    |   2 +
 .../core/message/impl/MessagePropertyTest.java     |   1 +
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 ...rationTest-xinclude-config-address-settings.xml |   1 +
 docs/user-manual/en/address-model.md               |  10 +
 .../integration/amqp/AmqpClientTestSupport.java    |  71 +++++++
 .../integration/amqp/AmqpIngressTimestampTest.java | 154 +++++++++++++++
 .../integration/client/IngressTimestampTest.java   | 207 +++++++++++++++++++++
 .../artemis/tests/integration/stomp/StompTest.java |  21 +++
 24 files changed, 596 insertions(+), 5 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index b5213e9..5b53be8 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -162,6 +162,11 @@ public interface Message {
    SimpleString HDR_ROUTING_TYPE = new SimpleString("_AMQ_ROUTING_TYPE");
 
    /**
+    * The time at which the message arrived at the broker.
+    */
+   SimpleString HDR_INGRESS_TIMESTAMP = new SimpleString("_AMQ_INGRESS_TIMESTAMP");
+
+   /**
     * The prefix used (if any) when sending this message.  For protocols (e.g. STOMP) that need to track this and restore
     * the prefix when the message is consumed.
     */
@@ -643,6 +648,15 @@ public interface Message {
       return getObjectProperty(key);
    }
 
+   default Message setIngressTimestamp() {
+      setBrokerProperty(HDR_INGRESS_TIMESTAMP, System.currentTimeMillis());
+      return this;
+   }
+
+   default Long getIngressTimestamp() {
+      return (Long) getBrokerProperty(HDR_INGRESS_TIMESTAMP);
+   }
+
    Short getShortProperty(SimpleString key) throws ActiveMQPropertyConversionException;
 
    Float getFloatProperty(SimpleString key) throws ActiveMQPropertyConversionException;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 771e745..526ad54 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -69,6 +69,7 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
          reader.readInto(wrapbuffer);
 
          AMQPStandardMessage standardMessage = new AMQPStandardMessage(messageFormat, buffer, extraProperties, coreMessageObjectPools);
+         standardMessage.setMessageAnnotations(messageAnnotations);
          standardMessage.setMessageID(messageID);
          return standardMessage.toCore();
       } catch (Exception e) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index f32c285..8606429 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -262,6 +262,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
       return applicationProperties;
    }
 
+   protected MessageAnnotations getDecodedMessageAnnotations() {
+      return messageAnnotations;
+   }
+
    protected abstract ReadableBuffer getData();
 
    // Access to the AMQP message data using safe copies freshly decoded from the current
@@ -586,6 +590,10 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
       getMessageAnnotationsMap(true).put(annotation, value);
    }
 
+   protected void setMessageAnnotations(MessageAnnotations messageAnnotations) {
+      this.messageAnnotations = messageAnnotations;
+   }
+
    // Message decoding and copying methods.  Care must be taken here to ensure the buffer and the
    // state tracking information is kept up to data.  When the message is manually changed a forced
    // re-encode should be done to update the backing data with the in memory elements.
@@ -1351,6 +1359,17 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
       return extra.getProperty(key);
    }
 
+   @Override
+   public final org.apache.activemq.artemis.api.core.Message setIngressTimestamp() {
+      setMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, System.currentTimeMillis());
+      return this;
+   }
+
+   @Override
+   public Long getIngressTimestamp() {
+      return (Long) getMessageAnnotation(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION);
+   }
+
 
    // JMS Style property access methods.  These can result in additional decode of AMQP message
    // data from Application properties.  Updates to application properties puts the message in a
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java
index a2cd44d..bab9ca9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageBrokerAccessor.java
@@ -20,6 +20,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 
 /** <b>Warning:</b> do not use this class outside of the broker implementation.
@@ -56,4 +57,8 @@ public class AMQPMessageBrokerAccessor {
       return message.getCurrentProperties();
    }
 
+   public static MessageAnnotations getDecodedMessageAnnotations(AMQPMessage message) {
+      return message.getDecodedMessageAnnotations();
+   }
+
 }
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
index 918d987..c8e1eba 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageSupport.java
@@ -183,6 +183,8 @@ public final class AMQPMessageSupport {
    public static final String X_OPT_PREFIX = "x-opt-";
    public static final String AMQ_PROPERTY_PREFIX = "_AMQ_";
 
+   public static final String X_OPT_INGRESS_TIME = X_OPT_PREFIX + "ingress-time";
+
    public static final short AMQP_UNKNOWN = 0;
    public static final short AMQP_NULL = 1;
    public static final short AMQP_DATA = 2;
@@ -195,6 +197,7 @@ public final class AMQPMessageSupport {
 
    public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-dest");
    public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = getSymbol("x-opt-jms-reply-to");
+   public static final Symbol INGRESS_TIME_MSG_ANNOTATION = getSymbol(X_OPT_INGRESS_TIME);
 
    public static final byte QUEUE_TYPE = 0x00;
    public static final byte TOPIC_TYPE = 0x01;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
index 8071dbe..3f14b30 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AmqpCoreConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
+import static org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
 import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_DATA;
 import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
@@ -292,6 +293,8 @@ public class AmqpCoreConverter {
                if (delay > 0) {
                   jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay);
                }
+            } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) && entry.getValue() != null) {
+               jms.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), ((Number) entry.getValue()).longValue());
             }
 
             try {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index ee7659e..bd27f8c 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -295,6 +295,9 @@ public class CoreAmqpConverter {
          } else if (key.endsWith(HDR_SCHEDULED_DELIVERY_TIME.toString())) {
             // skip..remove annotation from previous inbound transformation
             continue;
+         } else if (key.equals(Message.HDR_INGRESS_TIMESTAMP.toString())) {
+            maMap.put(AMQPMessageSupport.INGRESS_TIME_MSG_ANNOTATION, message.getLongProperty(key));
+            continue;
          }
 
          if (apMap == null) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index c65d86d..b383405 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -70,6 +70,7 @@ import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Properties;
@@ -701,7 +702,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
          try {
             int proposedPosition = writeHeaderAndAnnotations(context, deliveryAnnotationsToEncode);
             if (message.isReencoded()) {
-               proposedPosition = writePropertiesAndApplicationProperties(context, message);
+               proposedPosition = writeMessageAnnotationsPropertiesAndApplicationProperties(context, message);
             }
 
             context.position(proposedPosition);
@@ -716,14 +717,20 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
       /**
        * Write properties and application properties when the message is flagged as re-encoded.
        */
-      private int writePropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
+      private int writeMessageAnnotationsPropertiesAndApplicationProperties(LargeBodyReader context, AMQPLargeMessage message) throws Exception {
          int bodyPosition = AMQPMessageBrokerAccessor.getRemainingBodyPosition(message);
          assert bodyPosition > 0;
-         writePropertiesAndApplicationPropertiesInternal(message);
+         writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(message);
          return bodyPosition;
       }
 
-      private void writePropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
+      private void writeMessageAnnotationsPropertiesAndApplicationPropertiesInternal(AMQPLargeMessage message) {
+         MessageAnnotations messageAnnotations = AMQPMessageBrokerAccessor.getDecodedMessageAnnotations(message);
+
+         if (messageAnnotations != null) {
+            TLSEncode.getEncoder().writeObject(messageAnnotations);
+         }
+
          Properties amqpProperties = AMQPMessageBrokerAccessor.getCurrentProperties(message);
          if (amqpProperties != null) {
             TLSEncode.getEncoder().writeObject(amqpProperties);
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 136f7b9..337cefd 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -686,6 +686,11 @@ public final class OpenWireMessageConverter {
          setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
       }
 
+      final Long ingressTimestamp = coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) ? coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP) : null;
+      if (ingressTimestamp != null) {
+         setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
+      }
+
       final Set<SimpleString> props = coreMessage.getPropertyNames();
       if (props != null) {
          setAMQMsgObjectProperties(amqMsg, coreMessage, props);
@@ -937,6 +942,15 @@ public final class OpenWireMessageConverter {
       }
    }
 
+   private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage amqMsg,
+                                                    final Long ingressTimestamp) throws IOException {
+      try {
+         amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(), ingressTimestamp);
+      } catch (JMSException e) {
+         throw new IOException("failure to set ingress timestamp property " + ingressTimestamp, e);
+      }
+   }
+
    private static void setAMQMsgObjectProperties(final ActiveMQMessage amqMsg,
                                                  final ICoreMessage coreMessage,
                                                  final Set<SimpleString> props) throws IOException {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index c965302..5ec24f3 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -135,6 +135,8 @@ public interface Stomp {
          String PERSISTENT = "persistent";
 
          String VALIDATED_USER = "JMSXUserID";
+
+         String INGRESS_TIMESTAMP = "ingress-timestamp";
       }
 
       interface Subscribe {
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
index 07dcd8f..8829e40 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompUtils.java
@@ -132,6 +132,9 @@ public class StompUtils {
       if (message.containsProperty(Message.HDR_ROUTING_TYPE)) {
          command.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, RoutingType.getType(message.getByteProperty(Message.HDR_ROUTING_TYPE.toString())).toString());
       }
+      if (message.containsProperty(Message.HDR_INGRESS_TIMESTAMP)) {
+         command.addHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP, Long.toString(message.getLongProperty(Message.HDR_INGRESS_TIMESTAMP)));
+      }
 
       // now let's add all the rest of the message headers
       Set<SimpleString> names = message.getPropertyNames();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 915c2e7..2d8b81f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -311,6 +311,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String ENABLE_METRICS = "enable-metrics";
 
+   private static final String ENABLE_INGRESS_TIMESTAMP = "enable-ingress-timestamp";
 
    // Attributes ----------------------------------------------------
 
@@ -1361,6 +1362,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setExpiryQueueSuffix(new SimpleString(getTrimmedTextContent(child)));
          } else if (ENABLE_METRICS.equalsIgnoreCase(name)) {
             addressSettings.setEnableMetrics(XMLUtil.parseBoolean(child));
+         } else if (ENABLE_INGRESS_TIMESTAMP.equalsIgnoreCase(name)) {
+            addressSettings.setEnableIngressTimestamp(XMLUtil.parseBoolean(child));
          }
       }
       return setting;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 542a974..db9e2cb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -2181,6 +2181,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw ActiveMQMessageBundle.BUNDLE.rejectEmptyValidatedUser();
       }
 
+      if (server.getAddressSettingsRepository().getMatch(msg.getAddress()).isEnableIngressTimestamp()) {
+         msg.setIngressTimestamp();
+         msg.reencode();
+      }
+
       if (tx == null || autoCommitSends) {
          routingContext.setTransaction(null);
       } else {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 8f26b4e..79cece3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -133,6 +133,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final SlowConsumerThresholdMeasurementUnit DEFAULT_SLOW_CONSUMER_THRESHOLD_MEASUREMENT_UNIT = SlowConsumerThresholdMeasurementUnit.MESSAGES_PER_SECOND;
 
+   public static final boolean DEFAULT_ENABLE_INGRESS_TIMESTAMP = false;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -265,6 +267,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Integer managementMessageAttributeSizeLimit = null;
 
+   private Boolean enableIngressTimestamp = null;
+
    //from amq5
    //make it transient
    private transient Integer queuePrefetch = null;
@@ -332,6 +336,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.enableMetrics = other.enableMetrics;
       this.managementMessageAttributeSizeLimit = other.managementMessageAttributeSizeLimit;
       this.slowConsumerThresholdMeasurementUnit = other.slowConsumerThresholdMeasurementUnit;
+      this.enableIngressTimestamp = other.enableIngressTimestamp;
    }
 
    public AddressSettings() {
@@ -955,6 +960,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public boolean isEnableIngressTimestamp() {
+      return enableIngressTimestamp != null ? enableIngressTimestamp : AddressSettings.DEFAULT_ENABLE_INGRESS_TIMESTAMP;
+   }
+
+   public AddressSettings setEnableIngressTimestamp(final boolean enableIngressTimestamp) {
+      this.enableIngressTimestamp = enableIngressTimestamp;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -1154,6 +1168,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (enableMetrics == null) {
          enableMetrics = merged.enableMetrics;
       }
+      if (enableIngressTimestamp == null) {
+         enableIngressTimestamp = merged.enableIngressTimestamp;
+      }
    }
 
    @Override
@@ -1377,6 +1394,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
             slowConsumerThresholdMeasurementUnit = SlowConsumerThresholdMeasurementUnit.valueOf(slowConsumerMeasurementUnitEnumValue);
          }
       }
+
+      if (buffer.readableBytes() > 0) {
+         enableIngressTimestamp = BufferHelper.readNullableBoolean(buffer);
+      }
    }
 
    @Override
@@ -1442,7 +1463,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(enableMetrics) +
          BufferHelper.sizeOfNullableBoolean(defaultGroupRebalancePauseDispatch) +
          BufferHelper.sizeOfNullableInteger(managementMessageAttributeSizeLimit) +
-         BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue());
+         BufferHelper.sizeOfNullableInteger(slowConsumerThresholdMeasurementUnit.getValue()) +
+         BufferHelper.sizeOfNullableBoolean(enableIngressTimestamp);
    }
 
    @Override
@@ -1572,6 +1594,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       BufferHelper.writeNullableInteger(buffer, managementMessageAttributeSizeLimit);
 
       BufferHelper.writeNullableInteger(buffer, slowConsumerThresholdMeasurementUnit == null ? null : slowConsumerThresholdMeasurementUnit.getValue());
+
+      BufferHelper.writeNullableBoolean(buffer, enableIngressTimestamp);
    }
 
    /* (non-Javadoc)
@@ -1646,6 +1670,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((enableMetrics == null) ? 0 : enableMetrics.hashCode());
       result = prime * result + ((managementMessageAttributeSizeLimit == null) ? 0 : managementMessageAttributeSizeLimit.hashCode());
       result = prime * result + ((slowConsumerThresholdMeasurementUnit == null) ? 0 : slowConsumerThresholdMeasurementUnit.hashCode());
+      result = prime * result + ((enableIngressTimestamp == null) ? 0 : enableIngressTimestamp.hashCode());
       return result;
    }
 
@@ -2006,6 +2031,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (slowConsumerThresholdMeasurementUnit != other.slowConsumerThresholdMeasurementUnit)
          return false;
 
+      if (enableIngressTimestamp == null) {
+         if (other.enableIngressTimestamp != null)
+            return false;
+      } else if (!enableIngressTimestamp.equals(other.enableIngressTimestamp))
+         return false;
+
       return true;
    }
 
@@ -2141,6 +2172,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          expiryQueueSuffix +
          ", enableMetrics=" +
          enableMetrics +
+         ", enableIngressTime=" +
+         enableIngressTimestamp +
          "]";
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index ad612d2..614e056 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3970,6 +3970,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="enable-ingress-timestamp" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     whether or not the broker should set its own timestamp on incoming messages to the matching address
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
          </xsd:all>
 
          <xsd:attribute name="match" type="xsd:string" use="required">
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 7878098..07f11ab 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -384,6 +384,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
       assertEquals(0, conf.getAddressesSettings().get("a1").getRetroactiveMessageCount());
       assertTrue(conf.getAddressesSettings().get("a1").isEnableMetrics());
+      assertTrue(conf.getAddressesSettings().get("a1").isEnableIngressTimestamp());
 
       assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
       assertEquals(true, conf.getAddressesSettings().get("a2").isAutoCreateDeadLetterResources());
@@ -420,6 +421,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
       assertEquals(10, conf.getAddressesSettings().get("a2").getRetroactiveMessageCount());
       assertFalse(conf.getAddressesSettings().get("a2").isEnableMetrics());
+      assertFalse(conf.getAddressesSettings().get("a2").isEnableIngressTimestamp());
 
       assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
       assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
index a31e09f..0f20b82 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/message/impl/MessagePropertyTest.java
@@ -101,6 +101,7 @@ public class MessagePropertyTest extends ActiveMQTestBase {
             assertEquals(floatValue(i), message.getFloatProperty("float").floatValue(), 0.001);
             assertEquals(new SimpleString(Integer.toString(i)), message.getSimpleStringProperty(SIMPLE_STRING_KEY.toString()));
             assertEqualsByteArrays(byteArray(i), message.getBytesProperty("byte[]"));
+            assertNull(message.getIngressTimestamp());
 
             assertTrue(message.containsProperty("null-value"));
             assertEquals(message.getObjectProperty("null-value"), null);
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index a72459c..e0a6a5a 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -473,6 +473,7 @@
             <default-queue-routing-type>ANYCAST</default-queue-routing-type>
             <default-address-routing-type>MULTICAST</default-address-routing-type>
             <default-ring-size>3</default-ring-size>
+            <enable-ingress-timestamp>true</enable-ingress-timestamp>
          </address-setting>
          <address-setting match="a2">
             <dead-letter-address>a2.1</dead-letter-address>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 846c0e5..445c184 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -44,6 +44,7 @@
       <default-queue-routing-type>ANYCAST</default-queue-routing-type>
       <default-address-routing-type>MULTICAST</default-address-routing-type>
       <default-ring-size>3</default-ring-size>
+      <enable-ingress-timestamp>true</enable-ingress-timestamp>
    </address-setting>
    <address-setting match="a2">
       <dead-letter-address>a2.1</dead-letter-address>
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 599b871..e3635af 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -691,6 +691,7 @@ that would be found in the `broker.xml` file.
       <default-ring-size>-1</default-ring-size>
       <retroactive-message-count>0</retroactive-message-count>
       <enable-metrics>true</enable-metrics>
+      <enable-ingress-timestamp>false</enable-ingress-timestamp>
    </address-setting>
 </address-settings>
 ```
@@ -991,3 +992,12 @@ queues created on the matching address. Defaults to 0. Read more about
 `enable-metrics` determines whether or not metrics will be published to any
 configured metrics plugin for the matching address. Default is `true`. Read more
 about [metrics](metrics.md).
+
+`enable-ingress-timestamp` determines whether or not the broker will add its time 
+to messages sent to the matching address. When `true` the exact behavior will 
+depend on the specific protocol in use. For AMQP messages the broker will add a
+`long` *message annotation* named `x-opt-ingress-time`. For core messages (used by
+the core and OpenWire protocols) the broker will add a long property named
+`_AMQ_INGRESS_TIMESTAMP`. For STOMP messages the broker will add a frame header 
+named `ingress-timestamp`. The value will be the number of milliseconds since the
+[epoch](https://en.wikipedia.org/wiki/Unix_time). Default is `false`.
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
index 7d65cdb..7e15442 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.amqp;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 import java.net.URI;
@@ -25,10 +31,17 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -385,6 +398,10 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
    }
 
    protected void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+      sendMessages(destinationName, count, durable, null);
+   }
+
+   protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       try {
@@ -395,6 +412,9 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
             AmqpMessage message = new AmqpMessage();
             message.setMessageId("MessageID:" + i);
             message.setDurable(durable);
+            if (payload != null) {
+               message.setBytes(payload);
+            }
             sender.send(message);
          }
       } finally {
@@ -402,6 +422,57 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
       }
    }
 
+   protected void sendMessagesCore(String destinationName, int count, boolean durable) throws Exception {
+      sendMessagesCore(destinationName, count, durable, null);
+   }
+
+   protected void sendMessagesCore(String destinationName, int count, boolean durable, byte[] body) throws Exception {
+      ServerLocator serverLocator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:5672");
+      ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory();
+      ClientSession session = clientSessionFactory.createSession();
+      try {
+         ClientProducer sender = session.createProducer(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            ClientMessage message = session.createMessage(durable);
+            if (body != null) {
+               message.getBodyBuffer().writeBytes(body);
+            }
+            sender.send(message);
+         }
+      } finally {
+         session.close();
+      }
+   }
+
+   protected void sendMessagesOpenWire(String destinationName, int count, boolean durable) throws Exception {
+      sendMessagesOpenWire(destinationName, count, durable, null);
+   }
+
+   protected void sendMessagesOpenWire(String destinationName, int count, boolean durable, byte[] payload) throws Exception {
+      ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:5672");
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      try {
+         MessageProducer producer = session.createProducer(session.createQueue(destinationName));
+         if (durable) {
+            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+         } else {
+            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         }
+
+         for (int i = 0; i < count; ++i) {
+            BytesMessage message = session.createBytesMessage();
+            if (payload != null) {
+               message.writeBytes(payload);
+            }
+            producer.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
    protected Source createDynamicSource(boolean topic) {
 
       Source source = new Source();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java
new file mode 100644
index 0000000..798aa32
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpIngressTimestampTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class AmqpIngressTimestampTest extends AmqpClientTestSupport {
+
+   public int amqpMinLargeMessageSize = ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+   @Parameterized.Parameters(name = "restart={0}, large={1}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true, true},
+         {false, false},
+         {true, false},
+         {false, true}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean restart;
+
+   @Parameterized.Parameter(1)
+   public boolean large;
+
+   @Test(timeout = 60000)
+   public void testIngressTimestampSendCore() throws Exception {
+      internalTestIngressTimestamp(Protocol.CORE);
+   }
+
+   @Test(timeout = 60000)
+   public void testIngressTimestampSendAMQP() throws Exception {
+      internalTestIngressTimestamp(Protocol.AMQP);
+   }
+
+   @Test(timeout = 60000)
+   public void testIngressTimestampSendOpenWire() throws Exception {
+      internalTestIngressTimestamp(Protocol.OPENWIRE);
+   }
+
+   private void internalTestIngressTimestamp(Protocol protocol) throws Exception {
+      final String QUEUE_NAME = RandomUtil.randomString();
+      server.createQueue(new QueueConfiguration(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST));
+      server.getAddressSettingsRepository().addMatch(QUEUE_NAME, new AddressSettings().setEnableIngressTimestamp(true));
+      long beforeSend = System.currentTimeMillis();
+      if (protocol == Protocol.CORE) {
+         sendMessagesCore(QUEUE_NAME, 1, true, getMessagePayload());
+      } else if (protocol == Protocol.OPENWIRE) {
+         sendMessagesOpenWire(QUEUE_NAME, 1, true, getMessagePayload());
+      } else {
+         sendMessages(QUEUE_NAME, 1, true, getMessagePayload());
+      }
+      long afterSend = System.currentTimeMillis();
+
+      if (restart) {
+         server.stop();
+         server.start();
+         assertTrue(server.waitForActivation(3, TimeUnit.SECONDS));
+      }
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(QUEUE_NAME);
+
+      Queue queueView = getProxyToQueue(QUEUE_NAME);
+      Wait.assertEquals(1L, queueView::getMessageCount, 2000, 100, false);
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      instanceLog.info(receive);
+      Object ingressTimestampHeader = receive.getMessageAnnotation(AMQPMessageSupport.X_OPT_INGRESS_TIME);
+      assertNotNull(ingressTimestampHeader);
+      assertTrue(ingressTimestampHeader instanceof Long);
+      long ingressTimestamp = (Long) ingressTimestampHeader;
+      assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend);
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   private enum Protocol {
+      CORE, AMQP, OPENWIRE
+   }
+
+   @Override
+   protected void setData(AmqpMessage amqpMessage) throws Exception {
+      amqpMessage.setBytes(getMessagePayload());
+   }
+
+   @Override
+   protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
+      params.put("amqpMinLargeMessageSize", ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+   }
+
+   private byte[] getMessagePayload() {
+      StringBuilder result = new StringBuilder();
+      if (large) {
+         for (int i = 0; i < ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 20; i++) {
+            result.append("AB");
+         }
+      } else {
+         result.append("AB");
+      }
+
+      return result.toString().getBytes();
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java
new file mode 100644
index 0000000..2f9d1ad
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.message.JmsTextMessage;
+import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class IngressTimestampTest extends ActiveMQTestBase {
+   private ActiveMQServer server;
+
+   private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+   @Parameterized.Parameters(name = "restart={0}, large={1}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true, true},
+         {false, false},
+         {true, false},
+         {false, true}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean restart;
+
+   @Parameterized.Parameter(1)
+   public boolean large;
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      server.start();
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));
+      server.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
+   }
+
+   @Test
+   public void testSendCoreReceiveAMQP() throws Throwable {
+      internalSendReceive(Protocol.CORE, Protocol.AMQP);
+   }
+
+   @Test
+   public void testSendAMQPReceiveAMQP() throws Throwable {
+      internalSendReceive(Protocol.AMQP, Protocol.AMQP);
+   }
+
+   @Test
+   public void testSendOpenWireReceiveAMQP() throws Throwable {
+      internalSendReceive(Protocol.OPENWIRE, Protocol.AMQP);
+   }
+
+   @Test
+   public void testSendCoreReceiveCore() throws Throwable {
+      internalSendReceive(Protocol.CORE, Protocol.CORE);
+   }
+
+   @Test
+   public void testSendAMQPReceiveCore() throws Throwable {
+      internalSendReceive(Protocol.AMQP, Protocol.CORE);
+   }
+
+   @Test
+   public void testSendOpenWireReceiveCore() throws Throwable {
+      internalSendReceive(Protocol.OPENWIRE, Protocol.CORE);
+   }
+
+   @Test
+   public void testSendCoreReceiveOpenwire() throws Throwable {
+      internalSendReceive(Protocol.CORE, Protocol.OPENWIRE);
+   }
+
+   @Test
+   public void testSendAMQPReceiveOpenWire() throws Throwable {
+      internalSendReceive(Protocol.AMQP, Protocol.OPENWIRE);
+   }
+
+   @Test
+   public void testSendOpenWireReceiveOpenWire() throws Throwable {
+      internalSendReceive(Protocol.OPENWIRE, Protocol.OPENWIRE);
+   }
+
+   private void internalSendReceive(Protocol protocolSender, Protocol protocolConsumer) throws Throwable {
+      ConnectionFactory factorySend = createFactory(protocolSender);
+      ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
+
+      long beforeSend, afterSend;
+      try (Connection connection = factorySend.createConnection()) {
+         try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+            javax.jms.Queue queue = session.createQueue(QUEUE.toString());
+            try (MessageProducer producer = session.createProducer(queue)) {
+               producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+               TextMessage msg = session.createTextMessage(getMessagePayload());
+               beforeSend = System.currentTimeMillis();
+               producer.send(msg);
+               afterSend = System.currentTimeMillis();
+            }
+         }
+      }
+
+      if (restart) {
+         server.stop();
+         server.start();
+         assertTrue(server.waitForActivation(3, TimeUnit.SECONDS));
+      }
+
+      try (Connection connection = factoryConsume.createConnection()) {
+         connection.start();
+         try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+            javax.jms.Queue queue = session.createQueue(QUEUE.toString());
+            try (MessageConsumer consumer = session.createConsumer(queue)) {
+               TextMessage message = (TextMessage) consumer.receive(1000);
+               Assert.assertNotNull(message);
+               Enumeration e = message.getPropertyNames();
+               while (e.hasMoreElements()) {
+                  System.out.println(e.nextElement());
+               }
+               Object ingressTimestampHeader = null;
+               if (protocolConsumer == Protocol.AMQP) {
+                  // Qpid JMS doesn't expose message annotations so we must use reflection here
+                  Method getMessageAnnotation = AmqpJmsMessageFacade.class.getDeclaredMethod("getMessageAnnotation", Symbol.class);
+                  getMessageAnnotation.setAccessible(true);
+                  ingressTimestampHeader = getMessageAnnotation.invoke(((JmsTextMessage)message).getFacade(), Symbol.getSymbol(AMQPMessageSupport.X_OPT_INGRESS_TIME));
+               } else {
+                  ingressTimestampHeader = message.getObjectProperty(Message.HDR_INGRESS_TIMESTAMP.toString());
+               }
+               assertNotNull(ingressTimestampHeader);
+               assertTrue(ingressTimestampHeader instanceof Long);
+               long ingressTimestamp = (Long) ingressTimestampHeader;
+               assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend);
+            }
+         }
+      }
+   }
+
+   private String getMessagePayload() {
+      StringBuilder result = new StringBuilder();
+      if (large) {
+         for (int i = 0; i < ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE * 20; i++) {
+            result.append("AB");
+         }
+      } else {
+         result.append("AB");
+      }
+
+      return result.toString();
+   }
+
+   private ConnectionFactory createFactory(Protocol protocol) {
+      switch (protocol) {
+         case CORE: return new ActiveMQConnectionFactory(); // core protocol
+         case AMQP: return new JmsConnectionFactory("amqp://localhost:61616"); // amqp
+         case OPENWIRE: return new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616"); // openwire
+         default: return null;
+      }
+   }
+
+   private enum Protocol {
+      CORE, AMQP, OPENWIRE
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 24fc087..69428e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -638,6 +638,27 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testIngressTimestamp() throws Exception {
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));
+      conn.connect(defUser, defPass);
+
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
+      long beforeSend = System.currentTimeMillis();
+      sendJmsMessage(getName());
+      long afterSend = System.currentTimeMillis();
+
+      ClientStompFrame frame = conn.receiveFrame(10000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+      String ingressTimestampHeader = frame.getHeader(Stomp.Headers.Message.INGRESS_TIMESTAMP);
+      Assert.assertNotNull(ingressTimestampHeader);
+      long ingressTimestamp = Long.parseLong(ingressTimestampHeader);
+      assertTrue("Ingress timstamp " + ingressTimestamp + " should be >= " + beforeSend + " and <= " + afterSend,ingressTimestamp >= beforeSend && ingressTimestamp <= afterSend);
+
+      conn.disconnect();
+   }
+
+   @Test
    public void testAnycastDestinationTypeMessageProperty() throws Exception {
       conn.connect(defUser, defPass);