You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/02/12 21:39:27 UTC
[activemq-artemis] branch master updated: ARTEMIS-3116 Fixing
Core->AMQP Conversion of Scheduled Delivery Time
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 7d10e91 ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time
new d2f0603 This closes #3453
7d10e91 is described below
commit 7d10e915b6d28e10b4cddc36fed53eed20cf9cbc
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Feb 12 10:56:32 2021 -0500
ARTEMIS-3116 Fixing Core->AMQP Conversion of Scheduled Delivery Time
---
.../protocol/amqp/broker/AMQPStandardMessage.java | 63 +++++++++++++++
.../protocol/amqp/converter/CoreAmqpConverter.java | 55 ++-----------
.../amqp/converter/message/DirectConvertTest.java | 79 ++++++++++++++++++
.../message/MessageTransformationTest.java | 2 +-
.../AMQPScheduledCoreOverBrokerConnectTest.java | 94 ++++++++++++++++++++++
5 files changed, 245 insertions(+), 48 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
index cb2133b..ce2e374 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java
@@ -17,17 +17,27 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
+import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Footer;
+import org.apache.qpid.proton.amqp.messaging.Header;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
+import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
@@ -35,6 +45,59 @@ import org.apache.qpid.proton.codec.WritableBuffer;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPStandardMessage extends AMQPMessage {
+
+ public static AMQPStandardMessage createMessage(long messageID,
+ long messageFormat,
+ SimpleString replyTo,
+ Header header,
+ Properties properties,
+ Map<Symbol, Object> daMap,
+ Map<Symbol, Object> maMap,
+ Map<String, Object> apMap,
+ Map<Symbol, Object> footerMap,
+ Section body) {
+ ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
+
+ try {
+ EncoderImpl encoder = TLSEncode.getEncoder();
+ encoder.setByteBuffer(new NettyWritable(buffer));
+
+ if (header != null) {
+ encoder.writeObject(header);
+ }
+ if (daMap != null) {
+ encoder.writeObject(new DeliveryAnnotations(daMap));
+ }
+ if (maMap != null) {
+ encoder.writeObject(new MessageAnnotations(maMap));
+ }
+ if (properties != null) {
+ encoder.writeObject(properties);
+ }
+ if (apMap != null) {
+ encoder.writeObject(new ApplicationProperties(apMap));
+ }
+ if (body != null) {
+ encoder.writeObject(body);
+ }
+ if (footerMap != null) {
+ encoder.writeObject(new Footer(footerMap));
+ }
+
+ byte[] data = new byte[buffer.writerIndex()];
+ buffer.readBytes(data);
+
+ AMQPStandardMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null);
+ amqpMessage.setMessageID(messageID);
+ amqpMessage.setReplyTo(replyTo);
+ return amqpMessage;
+
+ } finally {
+ TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
+ buffer.release();
+ }
+ }
+
// Buffer and state for the data backing this message.
protected ReadableBuffer data;
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
index 2fa15fe..ee7659e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/CoreAmqpConverter.java
@@ -57,7 +57,6 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.ConversionException;
import org.apache.activemq.artemis.protocol.amqp.converter.coreWrapper.CoreMessageWrapper;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException;
-import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
@@ -65,22 +64,13 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header;
-import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecoderImpl;
-import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
-import org.apache.qpid.proton.codec.WritableBuffer;
import org.jboss.logging.Logger;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
-
public class CoreAmqpConverter {
private static Logger logger = Logger.getLogger(CoreAmqpConverter.class);
@@ -108,7 +98,6 @@ public class CoreAmqpConverter {
CoreMessageWrapper message = CoreMessageWrapper.wrap(coreMessage);
message.decode();
- long messageFormat = 0;
Header header = null;
final Properties properties = new Properties();
Map<Symbol, Object> daMap = null;
@@ -158,6 +147,12 @@ public class CoreAmqpConverter {
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, AMQPMessageSupport.destinationType(replyTo.toString()));
}
+ long scheduledDelivery = coreMessage.getScheduledDeliveryTime();
+
+ if (scheduledDelivery > 0) {
+ maMap.put(AMQPMessageSupport.SCHEDULED_DELIVERY_TIME, scheduledDelivery);
+ }
+
Object correlationID = message.getInnerMessage().getCorrelationID();
if (correlationID instanceof String || correlationID instanceof SimpleString) {
String c = correlationID instanceof String ? ((String) correlationID) : ((SimpleString) correlationID).toString();
@@ -314,42 +309,8 @@ public class CoreAmqpConverter {
apMap.put(key, objectProperty);
}
- ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024);
-
- try {
- EncoderImpl encoder = TLSEncode.getEncoder();
- encoder.setByteBuffer(new NettyWritable(buffer));
-
- if (header != null) {
- encoder.writeObject(header);
- }
- if (daMap != null) {
- encoder.writeObject(new DeliveryAnnotations(daMap));
- }
- encoder.writeObject(new MessageAnnotations(maMap));
- encoder.writeObject(properties);
- if (apMap != null) {
- encoder.writeObject(new ApplicationProperties(apMap));
- }
- if (body != null) {
- encoder.writeObject(body);
- }
- if (footerMap != null) {
- encoder.writeObject(new Footer(footerMap));
- }
-
- byte[] data = new byte[buffer.writerIndex()];
- buffer.readBytes(data);
-
- AMQPMessage amqpMessage = new AMQPStandardMessage(messageFormat, data, null);
- amqpMessage.setMessageID(message.getInnerMessage().getMessageID());
- amqpMessage.setReplyTo(coreMessage.getReplyTo());
- return amqpMessage;
-
- } finally {
- TLSEncode.getEncoder().setByteBuffer((WritableBuffer) null);
- buffer.release();
- }
+ long messageID = message.getInnerMessage().getMessageID();
+ return AMQPStandardMessage.createMessage(messageID, 0, replyTo, header, properties, daMap, maMap, apMap, footerMap, body);
}
private static Object decodeEmbeddedAMQPType(Object payload) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java
new file mode 100644
index 0000000..1be184c
--- /dev/null
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/DirectConvertTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.converter.message;
+
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DirectConvertTest {
+
+ @Test
+ public void testConvertScheduledAMQPCore() {
+ long deliveryTime = System.currentTimeMillis() + 10_000;
+ AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0,
+ null, null, null,
+ null, null, null, null, null);
+ standardMessage.setScheduledDeliveryTime(deliveryTime);
+
+ ICoreMessage coreMessage = standardMessage.toCore();
+
+ Assert.assertEquals((Long)deliveryTime, coreMessage.getScheduledDeliveryTime());
+ }
+
+
+ @Test
+ public void testConvertTTLdAMQPCore() {
+ long time = System.currentTimeMillis() + 10_000;
+ AMQPStandardMessage standardMessage = AMQPStandardMessage.createMessage(1, 0,
+ null, null, null,
+ null, null, null, null, null);
+ standardMessage.setExpiration(time);
+
+ ICoreMessage coreMessage = standardMessage.toCore();
+
+ Assert.assertEquals(time, coreMessage.getExpiration());
+ }
+
+ @Test
+ public void testConvertScheduledCoreAMQP() throws Exception {
+ long deliveryTime = System.currentTimeMillis() + 10_000;
+ CoreMessage coreMessage = new CoreMessage();
+ coreMessage.setScheduledDeliveryTime(deliveryTime);
+ coreMessage.initBuffer(1024);
+
+ AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager());
+ Assert.assertEquals((Long)deliveryTime, amqpMessage.getScheduledDeliveryTime());
+ }
+
+ @Test
+ public void testConvertTTLCoreAMQP() throws Exception {
+ long time = System.currentTimeMillis() + 10_000;
+ CoreMessage coreMessage = new CoreMessage();
+ coreMessage.setExpiration(time);
+ coreMessage.initBuffer(1024);
+
+ AMQPMessage amqpMessage = CoreAmqpConverter.fromCore(coreMessage, new NullStorageManager());
+ Assert.assertEquals(time, amqpMessage.getExpiration());
+ }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
index c4c527e..f89e0d7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/message/MessageTransformationTest.java
@@ -149,7 +149,7 @@ public class MessageTransformationTest {
AMQPMessage outboudMessage = AMQPConverter.getInstance().fromCore(core, null);
assertEquals(10, outboudMessage.getApplicationProperties().getValue().size());
- assertEquals(4, outboudMessage.getMessageAnnotations().getValue().size());
+ assertEquals(5, outboudMessage.getMessageAnnotations().getValue().size());
}
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java
new file mode 100644
index 0000000..eaa55fc
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPScheduledCoreOverBrokerConnectTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AMQPScheduledCoreOverBrokerConnectTest extends AmqpClientTestSupport {
+
+ protected static final int AMQP_PORT_2 = 5673;
+
+ ActiveMQServer server_2;
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP,OPENWIRE,CORE";
+ }
+
+ @Test
+ public void testWithDeliveryDelayCoreSendingConversion() throws Exception {
+ String queueName = "withScheduled";
+ server.setIdentity("targetServer");
+ server.start();
+ server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(queueName), RoutingType.ANYCAST));
+ server.createQueue(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+ server_2 = createServer(AMQP_PORT_2, false);
+
+ AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT);
+ amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setType(AMQPBrokerConnectionAddressType.MIRROR));
+ server_2.getConfiguration().addAMQPConnection(amqpConnection);
+ server_2.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(queueName).addRoutingType(RoutingType.ANYCAST));
+ server_2.getConfiguration().addQueueConfiguration(new QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+ server_2.setIdentity("serverWithBridge");
+
+ server_2.start();
+ Wait.assertTrue(server_2::isStarted);
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:" + AMQP_PORT_2);
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(queueName));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ producer.setDeliveryDelay(300_000);
+ producer.send(session.createMessage());
+
+ ConnectionFactory factory2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+ Connection connection2 = factory2.createConnection();
+ Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection2.start();
+
+ MessageConsumer consumer = session2.createConsumer(session2.createQueue(queueName));
+ Assert.assertNull(consumer.receive(500));
+ }
+}