You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/19 00:40:49 UTC

[activemq-artemis] branch master updated: ARTEMIS-2454 Message Body damaged after re-encoding

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f75f68  ARTEMIS-2454 Message Body damaged after re-encoding
     new 60e5bf9  This closes #2801
5f75f68 is described below

commit 5f75f68129f74d533a59b802112567b747352950
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sun Aug 18 14:11:06 2019 -0400

    ARTEMIS-2454 Message Body damaged after re-encoding
---
 .../apache/activemq/artemis/utils/ByteUtil.java    | 30 +++++++-
 .../artemis/protocol/amqp/broker/AMQPMessage.java  |  7 ++
 .../artemis/protocol/amqp/util/NettyReadable.java  |  2 +-
 .../protocol/amqp/converter/TestConversions.java   | 89 ++++++++++++++++++++++
 4 files changed, 125 insertions(+), 3 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index f34ce1a..12df5e4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.utils;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.nio.ByteBuffer;
 import java.nio.ReadOnlyBufferException;
 import java.util.Arrays;
@@ -62,7 +64,7 @@ public class ByteUtil {
       StringBuffer buffer = new StringBuffer();
 
       int line = 1;
-      buffer.append("/*  1 */ \"");
+      buffer.append("/*  0 */ \"");
       for (int i = 0; i < str.length(); i += groupSize) {
          buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize)));
 
@@ -72,7 +74,7 @@ public class ByteUtil {
             if (line < 10) {
                buffer.append(" ");
             }
-            buffer.append(Integer.toString(line) + " */ \"");
+            buffer.append(Integer.toString(i) + " */ \"");
          } else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize) {
             buffer.append("\" + \"");
          }
@@ -102,6 +104,30 @@ public class ByteUtil {
       return new String(hexChars);
    }
 
+   /** Simplify reading of a byte array in a programmers understable way */
+   public static String debugByteArray(byte[] byteArray) {
+      StringWriter builder = new StringWriter();
+      PrintWriter writer = new PrintWriter(builder);
+      for (int i = 0; i < byteArray.length; i++) {
+         writer.print("\t[" + i + "]=" + ByteUtil.byteToChar(byteArray[i]) + " / " + byteArray[i]);
+         if (i > 0 && i % 8 == 0) {
+            writer.println();
+         } else {
+            writer.print(" ");
+         }
+      }
+      return builder.toString();
+   }
+
+
+   public static String byteToChar(byte value) {
+      char[] hexChars = new char[2];
+      int v = value & 0xFF;
+      hexChars[0] = hexArray[v >>> 4];
+      hexChars[1] = hexArray[v & 0x0F];
+      return new String(hexChars);
+   }
+
    public static String bytesToHex(byte[] bytes, int groupSize) {
       if (bytes == null) {
          return "NULL";
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 540d277..69976d6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
 import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
 import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.ByteUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
 import org.apache.qpid.proton.amqp.Binary;
@@ -295,6 +296,12 @@ public class AMQPMessage extends RefCountMessage {
       return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
    }
 
+   /** This is different from toString, as this will print an expanded version of the buffer
+    *  in Hex and programmers's readable format */
+   public String toDebugString() {
+      return ByteUtil.debugByteArray(data.array());
+   }
+
    /**
     * Retrieves the AMQP Section that composes the body of this message by decoding a
     * fresh copy from the encoded message data.  Changes to the returned value are not
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
index 096d4a6..2d7bf55 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
@@ -151,7 +151,7 @@ public class NettyReadable implements ReadableBuffer {
 
    @Override
    public int arrayOffset() {
-      return buffer.arrayOffset() + buffer.readerIndex();
+      return buffer.arrayOffset();
    }
 
    @Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 74f399e..d3976a9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.converter;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -37,8 +39,10 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.message.Message;
 import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.jboss.logging.Logger;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -47,6 +51,8 @@ import io.netty.buffer.Unpooled;
 
 public class TestConversions extends Assert {
 
+   private static final Logger logger = Logger.getLogger(TestConversions.class);
+
    @Test
    public void testAmqpValueOfBooleanIsPassedThrough() throws Exception {
       Map<String, Object> mapprop = createPropertiesMap();
@@ -226,6 +232,89 @@ public class TestConversions extends Assert {
       assertEquals(text, textMessage.getText());
    }
 
+   @Test
+   public void testEditAndConvert() throws Exception {
+
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      properties.getValue().put("hello", "hello");
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      message.setMessageAnnotations(annotations);
+      message.setApplicationProperties(properties);
+
+      String text = "someText";
+      message.setBody(new AmqpValue(text));
+
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+      TypedProperties extraProperties = new TypedProperties();
+      encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
+
+      for (int i = 0; i < 10; i++) {
+         if (logger.isDebugEnabled()) {
+            logger.debug("Message encoded :: " + encodedMessage.toDebugString());
+         }
+
+         encodedMessage.messageChanged();
+         AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
+         Assert.assertEquals(text, (String)value.getValue());
+
+         // this line is needed to force a failure
+         ICoreMessage coreMessage = encodedMessage.toCore();
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("Converted message: " + coreMessage);
+         }
+      }
+   }
+
+   @Test
+   public void testExpandPropertiesAndConvert() throws Exception {
+
+      Map<String, Object> mapprop = createPropertiesMap();
+      ApplicationProperties properties = new ApplicationProperties(mapprop);
+      properties.getValue().put("hello", "hello");
+      MessageImpl message = (MessageImpl) Message.Factory.create();
+      MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+      message.setMessageAnnotations(annotations);
+      message.setApplicationProperties(properties);
+
+      String text = "someText";
+      message.setBody(new AmqpValue(text));
+
+      AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+      TypedProperties extraProperties = new TypedProperties();
+      encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
+
+      for (int i = 0; i < 100; i++) {
+         encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
+         encodedMessage.messageChanged();
+         encodedMessage.reencode();
+         AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
+         Assert.assertEquals(text, (String)value.getValue());
+         ICoreMessage coreMessage = encodedMessage.toCore();
+         if (logger.isDebugEnabled()) {
+            logger.debug("Converted message: " + coreMessage);
+         }
+
+         // I'm going to replace the message every 10 messages by a re-encoded version to check if the wiring still acturate.
+         // I want to mix replacing and not replacing to make sure the re-encoding is not giving me any surprises
+         if (i > 0 && i % 10 == 0) {
+            ByteBuf buf = Unpooled.buffer(15 * 1024, 150 * 1024);
+            encodedMessage.sendBuffer(buf, 1);
+            byte[] messageBytes = new byte[buf.writerIndex()];
+            buf.readBytes(messageBytes);
+
+            message = (MessageImpl) Message.Factory.create();
+            message.decode(ByteBuffer.wrap(messageBytes));
+            // This is replacing the message by the new expanded version
+            encodedMessage = encodeAndCreateAMQPMessage(message);
+
+         }
+
+      }
+   }
+
    private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
       NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
       message.encode(encoded);