You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/26 03:29:18 UTC
[activemq-artemis] branch master updated: ARTEMIS-2454 Fixing body
re-encoding
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new c929d34 ARTEMIS-2454 Fixing body re-encoding
new ede3902 This closes #2809
c929d34 is described below
commit c929d34eed84d3114abc7075a809c5ee894ca2f3
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sun Aug 25 22:08:54 2019 -0400
ARTEMIS-2454 Fixing body re-encoding
---
.../artemis/protocol/amqp/broker/AMQPMessage.java | 47 ++++++------
.../protocol/amqp/converter/TestConversions.java | 62 +++++++++++----
.../artemis/core/server/impl/DivertImpl.java | 5 +-
.../tests/integration/divert/DivertTest.java | 88 ++++++++++++++++++++++
4 files changed, 160 insertions(+), 42 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 69976d6..a2e2238 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -208,8 +208,10 @@ public class AMQPMessage extends RefCountMessage {
* @return a MessageImpl that wraps the AMQP message data in this {@link AMQPMessage}
*/
public MessageImpl getProtonMessage() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ if (data == null) {
+ throw new NullPointerException("Data is not initialized");
+ }
+ ensureScanning();
MessageImpl protonMessage = null;
if (data != null) {
@@ -228,11 +230,15 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the Message Header if one exists or null if none present.
*/
public Header getHeader() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
return scanForMessageSection(headerPosition, Header.class);
}
+ private void ensureScanning() {
+ ensureDataIsValid();
+ ensureMessageDataScanned();
+ }
+
/**
* Returns a copy of the MessageAnnotations in the message if present or null. Changes to the
* returned DeliveryAnnotations instance do not affect the original Message.
@@ -240,8 +246,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the {@link DeliveryAnnotations} present in the message or null if non present.
*/
public DeliveryAnnotations getDeliveryAnnotations() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
}
@@ -267,8 +272,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the {@link MessageAnnotations} present in the message or null if non present.
*/
public MessageAnnotations getMessageAnnotations() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
return scanForMessageSection(messageAnnotationsPosition, MessageAnnotations.class);
}
@@ -279,8 +283,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the Message Properties if one exists or null if none present.
*/
public Properties getProperties() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
return scanForMessageSection(propertiesPosition, Properties.class);
}
@@ -291,8 +294,7 @@ public class AMQPMessage extends RefCountMessage {
* @return a copy of the {@link ApplicationProperties} present in the message or null if non present.
*/
public ApplicationProperties getApplicationProperties() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
}
@@ -310,8 +312,7 @@ public class AMQPMessage extends RefCountMessage {
* @return the Section that makes up the body of this message.
*/
public Section getBody() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
// We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
// There could also be a Footer and no body so this will prevent a faulty return type in case
@@ -327,8 +328,7 @@ public class AMQPMessage extends RefCountMessage {
* @return the Footer that was encoded into this AMQP Message.
*/
public Footer getFooter() {
- ensureMessageDataScanned();
- ensureDataIsValid();
+ ensureScanning();
return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
}
@@ -445,11 +445,11 @@ public class AMQPMessage extends RefCountMessage {
private synchronized void ensureMessageDataScanned() {
if (!messageDataScanned) {
scanMessageData();
- messageDataScanned = true;
}
}
private synchronized void scanMessageData() {
+ this.messageDataScanned = true;
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data.rewind());
@@ -781,21 +781,17 @@ public class AMQPMessage extends RefCountMessage {
encodeMessage();
scanMessageData();
-
- messageDataScanned = true;
- modified = false;
}
private synchronized void ensureDataIsValid() {
- assert data != null;
-
if (modified) {
encodeMessage();
- modified = false;
}
}
private synchronized void encodeMessage() {
+ this.modified = false;
+ this.messageDataScanned = false;
int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
EncoderImpl encoder = TLSEncode.getEncoder();
@@ -1542,14 +1538,15 @@ public class AMQPMessage extends RefCountMessage {
@Override
public String toString() {
- return "AMQPMessage [durable=" + isDurable() +
+ /* return "AMQPMessage [durable=" + isDurable() +
", messageID=" + getMessageID() +
", address=" + getAddress() +
", size=" + getEncodeSize() +
", applicationProperties=" + applicationProperties +
", properties=" + properties +
", extraProperties = " + getExtraProperties() +
- "]";
+ "]"; */
+ return super.toString();
}
@Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 37b1103..472e9d9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -17,18 +17,14 @@
package org.apache.activemq.artemis.protocol.amqp.converter;
import java.nio.ByteBuffer;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
@@ -59,9 +55,11 @@ import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
public class TestConversions extends Assert {
@@ -421,8 +419,8 @@ public class TestConversions extends Assert {
}
encodedMessage.messageChanged();
- AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
- Assert.assertEquals(text, (String)value.getValue());
+ AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
+ Assert.assertEquals(text, (String) value.getValue());
// this line is needed to force a failure
ICoreMessage coreMessage = encodedMessage.toCore();
@@ -452,11 +450,11 @@ public class TestConversions extends Assert {
encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
for (int i = 0; i < 100; i++) {
- encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
+ encodedMessage.putStringProperty("another" + i, "value" + i);
encodedMessage.messageChanged();
encodedMessage.reencode();
- AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
- Assert.assertEquals(text, (String)value.getValue());
+ AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
+ Assert.assertEquals(text, (String) value.getValue());
ICoreMessage coreMessage = encodedMessage.toCore();
if (logger.isDebugEnabled()) {
logger.debug("Converted message: " + coreMessage);
@@ -476,7 +474,43 @@ public class TestConversions extends Assert {
encodedMessage = encodeAndCreateAMQPMessage(message);
}
+ }
+ }
+ @Test
+ public void testExpandNoReencode() throws Exception {
+
+ Map<String, Object> mapprop = createPropertiesMap();
+ ApplicationProperties properties = new ApplicationProperties(mapprop);
+ properties.getValue().put("hello", "hello");
+ MessageImpl message = (MessageImpl) Message.Factory.create();
+ MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+ message.setMessageAnnotations(annotations);
+ message.setApplicationProperties(properties);
+
+ String text = "someText";
+ message.setBody(new AmqpValue(text));
+
+ AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+ TypedProperties extraProperties = new TypedProperties();
+ encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
+
+ for (int i = 0; i < 100; i++) {
+ encodedMessage.setMessageID(333L);
+ if (i % 3 == 0) {
+ encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
+ } else {
+ encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
+ }
+ encodedMessage.putStringProperty("another " + i, "value " + i);
+ encodedMessage.messageChanged();
+ if (i % 2 == 0) {
+ encodedMessage.setAddress("THIS-IS-A-BIG-THIS-IS-A-BIG-ADDRESS-THIS-IS-A-BIG-ADDRESS-RIGHT");
+ } else {
+ encodedMessage.setAddress("A"); // small address
+ }
+ encodedMessage.messageChanged();
+ ICoreMessage coreMessage = encodedMessage.toCore();
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 7fbf20c..07997c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -106,8 +106,6 @@ public class DivertImpl implements Divert {
copy.setExpiration(message.getExpiration());
- copy.reencode();
-
switch (routingType) {
case ANYCAST:
copy.setRoutingType(RoutingType.ANYCAST);
@@ -126,7 +124,8 @@ public class DivertImpl implements Divert {
copy = transformer.transform(copy);
}
- copy.messageChanged();
+ // We call reencode at the end only, in a single call.
+ copy.reencode();
} else {
copy = message;
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 67cccec..d3c2320 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -16,6 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.divert;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
@@ -34,6 +41,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -42,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -129,6 +138,85 @@ public class DivertTest extends ActiveMQTestBase {
}
@Test
+ public void testCrossProtocol() throws Exception {
+ final String testForConvert = "testConvert";
+
+ final String testAddress = "testAddress";
+
+ final String forwardAddress = "forwardAddress";
+
+ DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).
+ setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
+
+ Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf);
+
+ ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+ server.start();
+
+ final SimpleString queueName1 = SimpleString.toSimpleString(testAddress);
+
+ final SimpleString queueName2 = SimpleString.toSimpleString(forwardAddress);
+
+ { // this is setting up the queues
+ ServerLocator locator = createInVMNonHALocator();
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName1, null, true);
+
+ session.createQueue(new SimpleString(testForConvert), RoutingType.ANYCAST, SimpleString.toSimpleString(testForConvert), null, true);
+
+ session.createQueue(new SimpleString(forwardAddress), RoutingType.MULTICAST, queueName2, null, true);
+ }
+
+ ConnectionFactory coreCF = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+ Connection coreConnection = coreCF.createConnection();
+ Session coreSession = coreConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producerCore = coreSession.createProducer(coreSession.createQueue(testForConvert));
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage textMessage = coreSession.createTextMessage("text" + i);
+ //if (i % 2 == 0) textMessage.setIntProperty("key", i);
+ producerCore.send(textMessage);
+ }
+
+ producerCore.close();
+
+ ConnectionFactory amqpCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
+
+ Connection amqpConnection = amqpCF.createConnection();
+ Session amqpSession = amqpConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+ Queue amqpQueue = amqpSession.createQueue(testAddress);
+ MessageProducer producer = amqpSession.createProducer(amqpQueue);
+ MessageConsumer consumerFromConvert = amqpSession.createConsumer(amqpSession.createQueue(testForConvert));
+ amqpConnection.start();
+
+ for (int i = 0; i < 10; i++) {
+ javax.jms.Message received = consumerFromConvert.receive(5000);
+ Assert.assertNotNull(received);
+ producer.send(received);
+ }
+
+
+ Queue outQueue = coreSession.createQueue(queueName2.toString());
+ MessageConsumer consumer = coreSession.createConsumer(outQueue);
+ coreConnection.start();
+
+ for (int i = 0; i < 10; i++) {
+ TextMessage textMessage = (TextMessage)consumer.receive(5000);
+ Assert.assertNotNull(textMessage);
+ Assert.assertEquals("text" + i, textMessage.getText());
+ //if (i % 2 == 0) Assert.assertEquals(i, textMessage.getIntProperty("key"));
+ }
+
+ Assert.assertNull(consumer.receiveNoWait());
+
+ }
+
+ @Test
public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
final String testAddress = "testAddress";