You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2020/09/28 15:16:24 UTC

[GitHub] [activemq-artemis] gemmellr commented on a change in pull request #3279: ARTEMIS-2919 support timestamping incoming messages

gemmellr commented on a change in pull request #3279:
URL: https://github.com/apache/activemq-artemis/pull/3279#discussion_r495974587



##########
File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
##########
@@ -636,6 +641,15 @@ default Object getBrokerProperty(SimpleString key) {
       return getObjectProperty(key);
    }
 
+   default Message setIngressTimestamp() {
+      setBrokerProperty(HDR_INGRESS_TIMESTAMP, System.nanoTime());
+      return this;
+   }
+
+   default Object getIngressTimestamp() {

Review comment:
       Is Object needed here? Seems like its using a long everywhere (and could presumably be converted if not)

##########
File path: artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
##########
@@ -636,6 +641,15 @@ default Object getBrokerProperty(SimpleString key) {
       return getObjectProperty(key);
    }
 
+   default Message setIngressTimestamp() {
+      setBrokerProperty(HDR_INGRESS_TIMESTAMP, System.nanoTime());

Review comment:
       Same as other comment

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/IngressTimestampTest.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IngressTimestampTest extends ActiveMQTestBase {
+   private ActiveMQServer server;
+
+   private final SimpleString QUEUE = new SimpleString("ConsumerTestQueue");
+
+   @Before
+   @Override
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, true);
+      server.start();
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setEnableIngressTimestamp(true));
+      server.createQueue(new QueueConfiguration(QUEUE).setRoutingType(RoutingType.ANYCAST));
+   }
+
+   @Test
+   public void testSendCoreReceiveCore() throws Throwable {
+      internalSend(Protocol.CORE, Protocol.CORE);
+   }
+
+   @Test
+   public void testSendAMQPReceiveCore() throws Throwable {
+      internalSend(Protocol.AMQP, Protocol.CORE);
+   }
+
+   @Test
+   public void testSendOpenWireReceiveCore() throws Throwable {
+      internalSend(Protocol.OPENWIRE, Protocol.CORE);
+   }
+
+   @Test
+   public void testSendCoreReceiveOpenwire() throws Throwable {
+      internalSend(Protocol.CORE, Protocol.OPENWIRE);
+   }
+
+   @Test
+   public void testSendAMQPReceiveOpenWire() throws Throwable {
+      internalSend(Protocol.AMQP, Protocol.OPENWIRE);
+   }
+
+   @Test
+   public void testSendOpenWireReceiveOpenWire() throws Throwable {
+      internalSend(Protocol.OPENWIRE, Protocol.OPENWIRE);
+   }
+
+   /**
+    *  we don't test AMQP here because there's no way to access the message annotations from the Qpid JMS client
+    *
+   @Test

Review comment:
       Rather than the commented out tests, why not add to the test class in the amqp package.

##########
File path: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
##########
@@ -1317,6 +1317,17 @@ public Object getBrokerProperty(SimpleString key) {
       return extra.getProperty(key);
    }
 
+   @Override
+   public final org.apache.activemq.artemis.api.core.Message setIngressTimestamp() {
+      setMessageAnnotation(AMQPMessageSupport.INGRESS_TIMESTAMP, System.nanoTime());

Review comment:
       This is almost certainly not what should be done here, as its almost useless as an actual timestamp, particularly when observed from outwith the broker JVM, or between different broker executions. (EDIT: also doesnt match what the docs say later about the value being epoch based)
   
   System.nanoTime() is a monotonic time source but it gives no guarantee as to its starting point, which can differ between JVMs and even be negative. The only thing its good for is comparing (through subtraction) with prior values of System.nanoTime() sourced inside the same JVM to measure elapsed duration between them.
   
   https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--
   
   For use as an actual timestamp style value for general comparison the broker would need to do something different like stamp the broker or JVM startup point for later comparisons, and add an offset from the epoch.

##########
File path: docs/user-manual/en/address-model.md
##########
@@ -996,3 +997,14 @@ queues created on the matching address. Defaults to 0. Read more about
 `enable-metrics` determines whether or not metrics will be published to any
 configured metrics plugin for the matching address. Default is `true`. Read more
 about [metrics](metrics.md).
+
+`enable-ingress-timestamp` determines whether or not the broker will add its
+timestamp to messages sent to the matching address. When `true` the broker will
+treat AMQP and non-AMQP messages differently. For AMQP messages the broker will
+add a *message annotation* named `x-opt-ingress-timestamp`. For core messages
+(used by the core and OpenWire protocols) the broker will add a property named
+`_AMQ_INGRESS_TIMESTAMP`. For STOMP messages the broker will add a frame header
+named `ingress-timestamp`. The value will be a 64-bit
+[epoch](https://en.wikipedia.org/wiki/Unix_time) timestamp with
+[*nano*second](https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--)

Review comment:
       Per earlier comment, this doesnt match what the value sent does, as System.nanoTime() isnt epoch based. 

##########
File path: artemis-tools/src/test/resources/artemis-configuration.xsd
##########
@@ -3456,6 +3456,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="enable-ingress-timestamp" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">

Review comment:
       Similar to default value comment for earlier xsd file (this one looks test-only so may not matter though)

##########
File path: artemis-server/src/main/resources/schema/artemis-configuration.xsd
##########
@@ -3695,6 +3695,14 @@
                   </xsd:documentation>
                </xsd:annotation>
             </xsd:element>
+
+            <xsd:element name="enable-ingress-timestamp" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">

Review comment:
       Is this saying the functionality is enabled by default? The docs and code suggest otherwise.

##########
File path: docs/user-manual/en/address-model.md
##########
@@ -996,3 +997,14 @@ queues created on the matching address. Defaults to 0. Read more about
 `enable-metrics` determines whether or not metrics will be published to any
 configured metrics plugin for the matching address. Default is `true`. Read more
 about [metrics](metrics.md).
+
+`enable-ingress-timestamp` determines whether or not the broker will add its
+timestamp to messages sent to the matching address. When `true` the broker will
+treat AMQP and non-AMQP messages differently. For AMQP messages the broker will
+add a *message annotation* named `x-opt-ingress-timestamp`. For core messages

Review comment:
       Should perhapssay it will be a "long" typed annotation.

##########
File path: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
##########
@@ -1260,5 +1263,33 @@ public void testReceiveRejecting() throws Exception {
       connection.close();
    }
 
+   @Test(timeout = 60000)
+   public void testIngressTimestamp() throws Exception {
+      server.getAddressSettingsRepository().addMatch(getQueueName(), new AddressSettings().setEnableIngressTimestamp(true));
+      sendMessages(getQueueName(), 1, false);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+
+      Queue queueView = getProxyToQueue(getQueueName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      instanceLog.info(receive);
+      Object ingressTimestamp = receive.getMessageAnnotation(AMQPMessageSupport.INGRESS_TIMESTAMP.toString());
+      assertNotNull(ingressTimestamp);
+      assertTrue(ingressTimestamp instanceof Date);

Review comment:
       This test fails, as it is checking for the wrong type. It should check for a long.
   
   It should also inspect the value and verify it is set within some delta (e.g <5sec since just before sending) so as to ensure the value is set properly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org