You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/03/21 22:53:39 UTC

[activemq-artemis] 01/02: ARTEMIS-3711 support AMQ_SCHEDULED_DELAY for OpenWire clients

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 3627ba57c9c48cc41d2036c913eb2c3c904a7743
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Tue Mar 8 11:06:00 2022 -0600

    ARTEMIS-3711 support AMQ_SCHEDULED_DELAY for OpenWire clients
---
 .../openwire/OpenWireMessageConverter.java         |  8 +++
 .../openwire/OpenWireScheduledDelayTest.java       | 82 ++++++++++++++++++++++
 2 files changed, 90 insertions(+)

diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index da74a43..e203b3a 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -38,6 +38,7 @@ import java.util.zip.InflaterInputStream;
 import java.util.zip.InflaterOutputStream;
 
 import com.google.common.io.BaseEncoding;
+import org.apache.activemq.ScheduledMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -224,6 +225,13 @@ public final class OpenWireMessageConverter {
          coreMessage.putStringProperty(AMQ_MSG_ORIG_DESTINATION, origDest.getQualifiedName());
       }
 
+      final Object scheduledDelay = messageSend.getProperties().get(ScheduledMessage.AMQ_SCHEDULED_DELAY);
+      if (scheduledDelay instanceof Long) {
+         coreMessage.putLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + ((Long) scheduledDelay));
+         // this property may have already been copied, but we don't need it anymore
+         coreMessage.removeProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
+      }
+
       return coreMessage;
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java
new file mode 100644
index 0000000..b45c9b8
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireScheduledDelayTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.Map;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OpenWireScheduledDelayTest extends OpenWireTestBase {
+
+   @Override
+   protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) {
+      addressSettingsMap.put("#", new AddressSettings().setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")));
+   }
+
+   @Test
+   public void testScheduledDelay() throws Exception {
+      final String QUEUE_NAME = RandomUtil.randomString();
+      final long DELAY = 2000;
+      final String PROP_NAME = RandomUtil.randomString();
+      final String FIRST = RandomUtil.randomString();
+      final String SECOND = RandomUtil.randomString();
+
+      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
+      Connection connection = connectionFactory.createConnection();
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      Destination destination = session.createQueue(QUEUE_NAME);
+      MessageProducer producer = session.createProducer(destination);
+      Message firstMessage = session.createMessage();
+      firstMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY);
+      firstMessage.setStringProperty(PROP_NAME, FIRST);
+      final long ETA = System.currentTimeMillis() + DELAY;
+      producer.send(firstMessage);
+      Message secondMessage = session.createMessage();
+      secondMessage.setStringProperty(PROP_NAME, SECOND);
+      producer.send(secondMessage);
+      producer.close();
+
+      MessageConsumer consumer = session.createConsumer(destination);
+
+      Message received = consumer.receive(250);
+      assertNotNull(received);
+      assertEquals(SECOND, received.getStringProperty(PROP_NAME));
+
+      received = consumer.receive(DELAY + 250);
+      assertNotNull(received);
+      assertEquals(FIRST, received.getStringProperty(PROP_NAME));
+      Assert.assertTrue(System.currentTimeMillis() >= ETA);
+
+      connection.close();
+   }
+}
+
+