You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/26 03:29:18 UTC

[activemq-artemis] branch master updated: ARTEMIS-2454 Fixing body re-encoding

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c929d34  ARTEMIS-2454 Fixing body re-encoding
     new ede3902  This closes #2809
c929d34 is described below

commit c929d34eed84d3114abc7075a809c5ee894ca2f3
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sun Aug 25 22:08:54 2019 -0400

    ARTEMIS-2454 Fixing body re-encoding
---
 .../artemis/protocol/amqp/broker/AMQPMessage.java  | 47 ++++++------
 .../protocol/amqp/converter/TestConversions.java   | 62 +++++++++++----
 .../artemis/core/server/impl/DivertImpl.java       |  5 +-
 .../tests/integration/divert/DivertTest.java       | 88 ++++++++++++++++++++++
 4 files changed, 160 insertions(+), 42 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 69976d6..a2e2238 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -208,8 +208,10 @@ public class AMQPMessage extends RefCountMessage {
     * @return a MessageImpl that wraps the AMQP message data in this {@link AMQPMessage}
     */
    public MessageImpl getProtonMessage() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      if (data == null) {
+         throw new NullPointerException("Data is not initialized");
+      }
+      ensureScanning();
 
       MessageImpl protonMessage = null;
       if (data != null) {
@@ -228,11 +230,15 @@ public class AMQPMessage extends RefCountMessage {
     * @return a copy of the Message Header if one exists or null if none present.
     */
    public Header getHeader() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
       return scanForMessageSection(headerPosition, Header.class);
    }
 
+   private void ensureScanning() {
+      ensureDataIsValid();
+      ensureMessageDataScanned();
+   }
+
    /**
     * Returns a copy of the MessageAnnotations in the message if present or null.  Changes to the
     * returned DeliveryAnnotations instance do not affect the original Message.
@@ -240,8 +246,7 @@ public class AMQPMessage extends RefCountMessage {
     * @return a copy of the {@link DeliveryAnnotations} present in the message or null if non present.
     */
    public DeliveryAnnotations getDeliveryAnnotations() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
       return scanForMessageSection(deliveryAnnotationsPosition, DeliveryAnnotations.class);
    }
 
@@ -267,8 +272,7 @@ public class AMQPMessage extends RefCountMessage {
     * @return a copy of the {@link MessageAnnotations} present in the message or null if non present.
     */
    public MessageAnnotations getMessageAnnotations() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
       return scanForMessageSection(messageAnnotationsPosition, MessageAnnotations.class);
    }
 
@@ -279,8 +283,7 @@ public class AMQPMessage extends RefCountMessage {
     * @return a copy of the Message Properties if one exists or null if none present.
     */
    public Properties getProperties() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
       return scanForMessageSection(propertiesPosition, Properties.class);
    }
 
@@ -291,8 +294,7 @@ public class AMQPMessage extends RefCountMessage {
     * @return a copy of the {@link ApplicationProperties} present in the message or null if non present.
     */
    public ApplicationProperties getApplicationProperties() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
       return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
    }
 
@@ -310,8 +312,7 @@ public class AMQPMessage extends RefCountMessage {
     * @return the Section that makes up the body of this message.
     */
    public Section getBody() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
 
       // We only handle Sections of AmqpSequence, AmqpValue and Data types so we filter on those.
       // There could also be a Footer and no body so this will prevent a faulty return type in case
@@ -327,8 +328,7 @@ public class AMQPMessage extends RefCountMessage {
     * @return the Footer that was encoded into this AMQP Message.
     */
    public Footer getFooter() {
-      ensureMessageDataScanned();
-      ensureDataIsValid();
+      ensureScanning();
       return scanForMessageSection(Math.max(0, remainingBodyPosition), Footer.class);
    }
 
@@ -445,11 +445,11 @@ public class AMQPMessage extends RefCountMessage {
    private synchronized void ensureMessageDataScanned() {
       if (!messageDataScanned) {
          scanMessageData();
-         messageDataScanned = true;
       }
    }
 
    private synchronized void scanMessageData() {
+      this.messageDataScanned = true;
       DecoderImpl decoder = TLSEncode.getDecoder();
       decoder.setBuffer(data.rewind());
 
@@ -781,21 +781,17 @@ public class AMQPMessage extends RefCountMessage {
 
       encodeMessage();
       scanMessageData();
-
-      messageDataScanned = true;
-      modified = false;
    }
 
    private synchronized void ensureDataIsValid() {
-      assert data != null;
-
       if (modified) {
          encodeMessage();
-         modified = false;
       }
    }
 
    private synchronized void encodeMessage() {
+      this.modified = false;
+      this.messageDataScanned = false;
       int estimated = Math.max(1500, data != null ? data.capacity() + 1000 : 0);
       ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(estimated);
       EncoderImpl encoder = TLSEncode.getEncoder();
@@ -1542,14 +1538,15 @@ public class AMQPMessage extends RefCountMessage {
 
    @Override
    public String toString() {
-      return "AMQPMessage [durable=" + isDurable() +
+      /* return "AMQPMessage [durable=" + isDurable() +
          ", messageID=" + getMessageID() +
          ", address=" + getAddress() +
          ", size=" + getEncodeSize() +
          ", applicationProperties=" + applicationProperties +
          ", properties=" + properties +
          ", extraProperties = " + getExtraProperties() +
-         "]";
+         "]"; */
+      return super.toString();
    }
 
    @Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 37b1103..472e9d9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -17,18 +17,14 @@
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
 import java.nio.ByteBuffer;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
-import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
-
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
@@ -59,9 +55,11 @@ import org.jboss.logging.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.AMQP_NULL;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_DELIVERY_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_FOOTER_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ENCODED_MESSAGE_ANNOTATION_PREFIX;
+import static org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.JMS_AMQP_ORIGINAL_ENCODING;
 
 public class TestConversions extends Assert {
 
@@ -421,8 +419,8 @@ public class TestConversions extends Assert {
          }
 
          encodedMessage.messageChanged();
-         AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
-         Assert.assertEquals(text, (String)value.getValue());
+         AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
+         Assert.assertEquals(text, (String) value.getValue());
 
          // this line is needed to force a failure
          ICoreMessage coreMessage = encodedMessage.toCore();
@@ -452,11 +450,11 @@ public class TestConversions extends Assert {
       encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
 
       for (int i = 0; i < 100; i++) {
-         encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
+         encodedMessage.putStringProperty("another" + i, "value" + i);
          encodedMessage.messageChanged();
          encodedMessage.reencode();
-         AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
-         Assert.assertEquals(text, (String)value.getValue());
+         AmqpValue value = (AmqpValue) encodedMessage.getProtonMessage().getBody();
+         Assert.assertEquals(text, (String) value.getValue());
          ICoreMessage coreMessage = encodedMessage.toCore();
          if (logger.isDebugEnabled()) {
             logger.debug("Converted message: " + coreMessage);
@@ -476,7 +474,43 @@ public class TestConversions extends Assert {
             encodedMessage = encodeAndCreateAMQPMessage(message);
 
          }
+      }
+   }
 
+   @Test
+   public void testExpandNoReencode() throws Exception {
+
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      properties.getValue().put("hello", "hello");
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      message.setMessageAnnotations(annotations);
+      message.setApplicationProperties(properties);
+
+      String text = "someText";
+      message.setBody(new AmqpValue(text));
+
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+      TypedProperties extraProperties = new TypedProperties();
+      encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
+
+      for (int i = 0; i < 100; i++) {
+         encodedMessage.setMessageID(333L);
+         if (i % 3 == 0) {
+            encodedMessage.referenceOriginalMessage(encodedMessage, "SOME-OTHER-QUEUE-DOES-NOT-MATTER-WHAT");
+         } else {
+            encodedMessage.referenceOriginalMessage(encodedMessage, "XXX");
+         }
+         encodedMessage.putStringProperty("another " + i, "value " + i);
+         encodedMessage.messageChanged();
+         if (i % 2 == 0) {
+            encodedMessage.setAddress("THIS-IS-A-BIG-THIS-IS-A-BIG-ADDRESS-THIS-IS-A-BIG-ADDRESS-RIGHT");
+         } else {
+            encodedMessage.setAddress("A"); // small address
+         }
+         encodedMessage.messageChanged();
+         ICoreMessage coreMessage = encodedMessage.toCore();
       }
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
index 7fbf20c..07997c5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java
@@ -106,8 +106,6 @@ public class DivertImpl implements Divert {
 
          copy.setExpiration(message.getExpiration());
 
-         copy.reencode();
-
          switch (routingType) {
             case ANYCAST:
                copy.setRoutingType(RoutingType.ANYCAST);
@@ -126,7 +124,8 @@ public class DivertImpl implements Divert {
             copy = transformer.transform(copy);
          }
 
-         copy.messageChanged();
+         // We call reencode at the end only, in a single call.
+         copy.reencode();
       } else {
          copy = message;
       }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
index 67cccec..d3c2320 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.divert;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
@@ -34,6 +41,7 @@ import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.api.core.RoutingType;
 
@@ -42,6 +50,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -129,6 +138,85 @@ public class DivertTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testCrossProtocol() throws Exception {
+      final String testForConvert = "testConvert";
+
+      final String testAddress = "testAddress";
+
+      final String forwardAddress = "forwardAddress";
+
+      DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).
+         setRoutingType(ComponentConfigurationRoutingType.ANYCAST);
+
+      Configuration config = createDefaultNettyConfig().addDivertConfiguration(divertConf);
+
+      ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false));
+
+      server.start();
+
+      final SimpleString queueName1 = SimpleString.toSimpleString(testAddress);
+
+      final SimpleString queueName2 = SimpleString.toSimpleString(forwardAddress);
+
+      { // this is setting up the queues
+         ServerLocator locator = createInVMNonHALocator();
+
+         ClientSessionFactory sf = createSessionFactory(locator);
+
+         ClientSession session = sf.createSession(false, true, true);
+
+         session.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName1, null, true);
+
+         session.createQueue(new SimpleString(testForConvert), RoutingType.ANYCAST, SimpleString.toSimpleString(testForConvert), null, true);
+
+         session.createQueue(new SimpleString(forwardAddress), RoutingType.MULTICAST, queueName2, null, true);
+      }
+
+      ConnectionFactory coreCF = CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+      Connection coreConnection = coreCF.createConnection();
+      Session coreSession = coreConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producerCore = coreSession.createProducer(coreSession.createQueue(testForConvert));
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage textMessage = coreSession.createTextMessage("text" + i);
+         //if (i % 2 == 0) textMessage.setIntProperty("key", i);
+         producerCore.send(textMessage);
+      }
+
+      producerCore.close();
+
+      ConnectionFactory amqpCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");
+
+      Connection amqpConnection = amqpCF.createConnection();
+      Session amqpSession = amqpConnection.createSession(Session.AUTO_ACKNOWLEDGE);
+      Queue amqpQueue = amqpSession.createQueue(testAddress);
+      MessageProducer producer = amqpSession.createProducer(amqpQueue);
+      MessageConsumer consumerFromConvert = amqpSession.createConsumer(amqpSession.createQueue(testForConvert));
+      amqpConnection.start();
+
+      for (int i = 0; i < 10; i++) {
+         javax.jms.Message received =  consumerFromConvert.receive(5000);
+         Assert.assertNotNull(received);
+         producer.send(received);
+      }
+
+
+      Queue outQueue = coreSession.createQueue(queueName2.toString());
+      MessageConsumer consumer = coreSession.createConsumer(outQueue);
+      coreConnection.start();
+
+      for (int i = 0; i < 10; i++) {
+         TextMessage textMessage = (TextMessage)consumer.receive(5000);
+         Assert.assertNotNull(textMessage);
+         Assert.assertEquals("text" + i, textMessage.getText());
+         //if (i % 2 == 0) Assert.assertEquals(i, textMessage.getIntProperty("key"));
+      }
+
+      Assert.assertNull(consumer.receiveNoWait());
+
+   }
+
+   @Test
    public void testSingleNonExclusiveDivertWithRoutingType() throws Exception {
       final String testAddress = "testAddress";