You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/12 21:39:27 UTC

[activemq-artemis] branch master updated: ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d10e91  ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time
     new d2f0603  This closes #3453
7d10e91 is described below

commit 7d10e915b6d28e10b4cddc36fed53eed20cf9cbc
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Feb 12 10:56:32 2021 -0500

    ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time
---
 .../protocol/amqp/broker/AMQPStandardMessage.java  | 63 +++++++++++++++
 .../protocol/amqp/converter/CoreAmqpConverter.java | 55 ++-----------
 .../amqp/converter/message/DirectConvertTest.java  | 79 ++++++++++++++++++
 .../message/MessageTransformationTest.java         |  2 +-
 .../AMQPScheduledCoreOverBrokerConnectTest.java    | 94 ++++++++++++++++++++++
 5 files changed, 245 insertions(+), 48 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index cb2133b..ce2e374 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -17,17 +17,27 @@
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.persistence.Persister;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.codec.WritableBuffer;
@@ -35,6 +45,59 @@ import org.apache.qpid.proton.codec.WritableBuffer;
 // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
 public class AMQPStandardMessage extends AMQPMessage {
 
+
+   public static AMQPStandardMessage createMessage(long messageID,
+                                                   long messageFormat,
+                                                   SimpleString replyTo,
+                                                   Header header,
+                                                   Properties properties,
+                                                   Map<Symbol, Object> daMap,
+                                                   Map<Symbol, Object> maMap,
+                                                   Map<String, Object> apMap,
+                                                   Map<Symbol, Object> footerMap,
+                                                   Section body) {
+      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+      try {
+         EncoderImpl encoder = TLSEncode.getEncoder();
+         encoder.setByteBuffer(new NettyWritable(buffer));
+
+         if (header != null) {
+            encoder.writeObject(header);
+         }
+         if (daMap != null) {
+            encoder.writeObject(new DeliveryAnnotations(daMap));
+         }
+         if (maMap != null) {
+            encoder.writeObject(new MessageAnnotations(maMap));
+         }
+         if (properties != null) {
+            encoder.writeObject(properties);
+         }
+         if (apMap != null) {
+            encoder.writeObject(new ApplicationProperties(apMap));
+         }
+         if (body != null) {
+            encoder.writeObject(body);
+         }
+         if (footerMap != null) {
+            encoder.writeObject(new Footer(footerMap));
+         }
+
+         byte[] data = new byte[buffer.writerIndex()];
+         buffer.readBytes(data);
+
+         AMQPStandardMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null);
+         amqpMessage.setMessageID(messageID);
+         amqpMessage.setReplyTo(replyTo);
+         return amqpMessage;
+
+      } finally {
+         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+         buffer.release();
+      }
+   }
+
    // Buffer and state for the data backing this message.
    protected ReadableBuffer data;
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 2fa15fe..ee7659e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
 import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException;
 import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper;
 import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
@@ -65,22 +64,13 @@ import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
 import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
 import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
-import org.apache.qpid.proton.codec.WritableBuffer;
 import org.jboss.logging.Logger;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
 public class CoreAmqpConverter {
 
    private static Logger logger = Logger.getLogger(CoreAmqpConverter.class);
@@ -108,7 +98,6 @@ public class CoreAmqpConverter {
       CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage);
       message.decode();
 
-      long messageFormat = 0;
       Header header = null;
       final Properties properties = new Properties();
       Map<Symbol, Object> daMap = null;
@@ -158,6 +147,12 @@ public class CoreAmqpConverter {
          maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString()));
       }
 
+      long scheduledDelivery = coreMessage.getScheduledDeliveryTime();
+
+      if (scheduledDelivery > 0) {
+         maMap.put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledDelivery);
+      }
+
       Object correlationID = message.getInnerMessage().getCorrelationID();
       if (correlationID instanceof String || correlationID instanceof SimpleString) {
          String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
@@ -314,42 +309,8 @@ public class CoreAmqpConverter {
          apMap.put(key, objectProperty);
       }
 
-      ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
-      try {
-         EncoderImpl encoder = TLSEncode.getEncoder();
-         encoder.setByteBuffer(new NettyWritable(buffer));
-
-         if (header != null) {
-            encoder.writeObject(header);
-         }
-         if (daMap != null) {
-            encoder.writeObject(new DeliveryAnnotations(daMap));
-         }
-         encoder.writeObject(new MessageAnnotations(maMap));
-         encoder.writeObject(properties);
-         if (apMap != null) {
-            encoder.writeObject(new ApplicationProperties(apMap));
-         }
-         if (body != null) {
-            encoder.writeObject(body);
-         }
-         if (footerMap != null) {
-            encoder.writeObject(new Footer(footerMap));
-         }
-
-         byte[] data = new byte[buffer.writerIndex()];
-         buffer.readBytes(data);
-
-         AMQPMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null);
-         amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
-         amqpMessage.setReplyTo(coreMessage.getReplyTo());
-         return amqpMessage;
-
-      } finally {
-         TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
-         buffer.release();
-      }
+      long messageID = message.getInnerMessage().getMessageID();
+      return AMQPStandardMessage.createMessage(messageID, 0, replyTo, header, properties, daMap, maMap, apMap, footerMap, body);
    }
 
    private static Object decodeEmbeddedAMQPType(Object payload) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java
new file mode 100644
index 0000000..1be184c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DirectConvertTest {
+
+   @Test
+   public void testConvertScheduledAMQPCore() {
+      long deliveryTime = System.currentTimeMillis() + 10_000;
+      AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0,
+                                                                              null, null, null,
+                                                                              null, null, null, null, null);
+      standardMessage.setScheduledDeliveryTime(deliveryTime);
+
+      ICoreMessage coreMessage = standardMessage.toCore();
+
+      Assert.assertEquals((Long)deliveryTime, coreMessage.getScheduledDeliveryTime());
+   }
+
+
+   @Test
+   public void testConvertTTLdAMQPCore() {
+      long time = System.currentTimeMillis() + 10_000;
+      AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0,
+                                                                              null, null, null,
+                                                                              null, null, null, null, null);
+      standardMessage.setExpiration(time);
+
+      ICoreMessage coreMessage = standardMessage.toCore();
+
+      Assert.assertEquals(time, coreMessage.getExpiration());
+   }
+
+   @Test
+   public void testConvertScheduledCoreAMQP() throws Exception {
+      long deliveryTime = System.currentTimeMillis() + 10_000;
+      CoreMessage coreMessage = new CoreMessage();
+      coreMessage.setScheduledDeliveryTime(deliveryTime);
+      coreMessage.initBuffer(1024);
+
+      AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager());
+      Assert.assertEquals((Long)deliveryTime, amqpMessage.getScheduledDeliveryTime());
+   }
+
+   @Test
+   public void testConvertTTLCoreAMQP() throws Exception {
+      long time = System.currentTimeMillis() + 10_000;
+      CoreMessage coreMessage = new CoreMessage();
+      coreMessage.setExpiration(time);
+      coreMessage.initBuffer(1024);
+
+      AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager());
+      Assert.assertEquals(time, amqpMessage.getExpiration());
+   }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index c4c527e..f89e0d7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -149,7 +149,7 @@ public class MessageTransformationTest {
       AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);
 
       assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
-      assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
+      assertEquals(5, outboudMessage.getMessageAnnotations().getValue().size());
    }
 
    private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java
new file mode 100644
index 0000000..eaa55fc
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AMQPScheduledCoreOverBrokerConnectTest extends AmqpClientTestSupport {
+
+   protected static final int AMQP_PORT_2 = 5673;
+
+   ActiveMQServer server_2;
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP,OPENWIRE,CORE";
+   }
+
+   @Test
+   public void testWithDeliveryDelayCoreSendingConversion() throws Exception {
+      String queueName = "withScheduled";
+      server.setIdentity("targetServer");
+      server.start();
+      server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST));
+      server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+      server_2 = createServer(AMQP_PORT_2, false);
+
+      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+      amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR));
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      server_2.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
+      server_2.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+      server_2.setIdentity("serverWithBridge");
+
+      server_2.start();
+      Wait.assertTrue(server_2::isStarted);
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(session.createQueue(queueName));
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+      producer.setDeliveryDelay(300_000);
+      producer.send(session.createMessage());
+
+      ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+      Connection connection2 = factory2.createConnection();
+      Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      connection2.start();
+
+      MessageConsumer consumer = session2.createConsumer(session2.createQueue(queueName));
+      Assert.assertNull(consumer.receive(500));
+   }
+}