You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/19 00:40:49 UTC
[activemq-artemis] branch master updated: ARTEMIS-2454 Message Body
damaged after re-encoding
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 5f75f68 ARTEMIS-2454 Message Body damaged after re-encoding
new 60e5bf9 This closes #2801
5f75f68 is described below
commit 5f75f68129f74d533a59b802112567b747352950
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sun Aug 18 14:11:06 2019 -0400
ARTEMIS-2454 Message Body damaged after re-encoding
---
.../apache/activemq/artemis/utils/ByteUtil.java | 30 +++++++-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 7 ++
.../artemis/protocol/amqp/util/NettyReadable.java | 2 +-
.../protocol/amqp/converter/TestConversions.java | 89 ++++++++++++++++++++++
4 files changed, 125 insertions(+), 3 deletions(-)
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
index f34ce1a..12df5e4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ByteUtil.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.utils;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.nio.ReadOnlyBufferException;
import java.util.Arrays;
@@ -62,7 +64,7 @@ public class ByteUtil {
StringBuffer buffer = new StringBuffer();
int line = 1;
- buffer.append("/* 1 */ \"");
+ buffer.append("/* 0 */ \"");
for (int i = 0; i < str.length(); i += groupSize) {
buffer.append(str.substring(i, i + Math.min(str.length() - i, groupSize)));
@@ -72,7 +74,7 @@ public class ByteUtil {
if (line < 10) {
buffer.append(" ");
}
- buffer.append(Integer.toString(line) + " */ \"");
+ buffer.append(Integer.toString(i) + " */ \"");
} else if ((i + groupSize) % groupSize == 0 && str.length() - i > groupSize) {
buffer.append("\" + \"");
}
@@ -102,6 +104,30 @@ public class ByteUtil {
return new String(hexChars);
}
+ /** Simplify reading of a byte array in a programmers understable way */
+ public static String debugByteArray(byte[] byteArray) {
+ StringWriter builder = new StringWriter();
+ PrintWriter writer = new PrintWriter(builder);
+ for (int i = 0; i < byteArray.length; i++) {
+ writer.print("\t[" + i + "]=" + ByteUtil.byteToChar(byteArray[i]) + " / " + byteArray[i]);
+ if (i > 0 && i % 8 == 0) {
+ writer.println();
+ } else {
+ writer.print(" ");
+ }
+ }
+ return builder.toString();
+ }
+
+
+ public static String byteToChar(byte value) {
+ char[] hexChars = new char[2];
+ int v = value & 0xFF;
+ hexChars[0] = hexArray[v >>> 4];
+ hexChars[1] = hexArray[v & 0x0F];
+ return new String(hexChars);
+ }
+
public static String bytesToHex(byte[] bytes, int groupSize) {
if (bytes == null) {
return "NULL";
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 540d277..69976d6 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
+import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.qpid.proton.amqp.Binary;
@@ -295,6 +296,12 @@ public class AMQPMessage extends RefCountMessage {
return scanForMessageSection(applicationPropertiesPosition, ApplicationProperties.class);
}
+ /** This is different from toString, as this will print an expanded version of the buffer
+ * in Hex and programmers's readable format */
+ public String toDebugString() {
+ return ByteUtil.debugByteArray(data.array());
+ }
+
/**
* Retrieves the AMQP Section that composes the body of this message by decoding a
* fresh copy from the encoded message data. Changes to the returned value are not
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
index 096d4a6..2d7bf55 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/NettyReadable.java
@@ -151,7 +151,7 @@ public class NettyReadable implements ReadableBuffer {
@Override
public int arrayOffset() {
- return buffer.arrayOffset() + buffer.readerIndex();
+ return buffer.arrayOffset();
}
@Override
diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
index 74f399e..d3976a9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.artemis.protocol.amqp.converter;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -37,8 +39,10 @@ import org.apache.qpid.proton.amqp.messaging.AmqpSequence;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
+import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
@@ -47,6 +51,8 @@ import io.netty.buffer.Unpooled;
public class TestConversions extends Assert {
+ private static final Logger logger = Logger.getLogger(TestConversions.class);
+
@Test
public void testAmqpValueOfBooleanIsPassedThrough() throws Exception {
Map<String, Object> mapprop = createPropertiesMap();
@@ -226,6 +232,89 @@ public class TestConversions extends Assert {
assertEquals(text, textMessage.getText());
}
+ @Test
+ public void testEditAndConvert() throws Exception {
+
+ Map<String, Object> mapprop = createPropertiesMap();
+ ApplicationProperties properties = new ApplicationProperties(mapprop);
+ properties.getValue().put("hello", "hello");
+ MessageImpl message = (MessageImpl) Message.Factory.create();
+ MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+ message.setMessageAnnotations(annotations);
+ message.setApplicationProperties(properties);
+
+ String text = "someText";
+ message.setBody(new AmqpValue(text));
+
+ AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+ TypedProperties extraProperties = new TypedProperties();
+ encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
+
+ for (int i = 0; i < 10; i++) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Message encoded :: " + encodedMessage.toDebugString());
+ }
+
+ encodedMessage.messageChanged();
+ AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
+ Assert.assertEquals(text, (String)value.getValue());
+
+ // this line is needed to force a failure
+ ICoreMessage coreMessage = encodedMessage.toCore();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Converted message: " + coreMessage);
+ }
+ }
+ }
+
+ @Test
+ public void testExpandPropertiesAndConvert() throws Exception {
+
+ Map<String, Object> mapprop = createPropertiesMap();
+ ApplicationProperties properties = new ApplicationProperties(mapprop);
+ properties.getValue().put("hello", "hello");
+ MessageImpl message = (MessageImpl) Message.Factory.create();
+ MessageAnnotations annotations = new MessageAnnotations(new HashMap<>());
+ message.setMessageAnnotations(annotations);
+ message.setApplicationProperties(properties);
+
+ String text = "someText";
+ message.setBody(new AmqpValue(text));
+
+ AMQPMessage encodedMessage = encodeAndCreateAMQPMessage(message);
+ TypedProperties extraProperties = new TypedProperties();
+ encodedMessage.setAddress(SimpleString.toSimpleString("xxxx.v1.queue"));
+
+ for (int i = 0; i < 100; i++) {
+ encodedMessage.getApplicationProperties().getValue().put("another" + i, "value" + i);
+ encodedMessage.messageChanged();
+ encodedMessage.reencode();
+ AmqpValue value = (AmqpValue)encodedMessage.getProtonMessage().getBody();
+ Assert.assertEquals(text, (String)value.getValue());
+ ICoreMessage coreMessage = encodedMessage.toCore();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Converted message: " + coreMessage);
+ }
+
+ // I'm going to replace the message every 10 messages by a re-encoded version to check if the wiring still acturate.
+ // I want to mix replacing and not replacing to make sure the re-encoding is not giving me any surprises
+ if (i > 0 && i % 10 == 0) {
+ ByteBuf buf = Unpooled.buffer(15 * 1024, 150 * 1024);
+ encodedMessage.sendBuffer(buf, 1);
+ byte[] messageBytes = new byte[buf.writerIndex()];
+ buf.readBytes(messageBytes);
+
+ message = (MessageImpl) Message.Factory.create();
+ message.decode(ByteBuffer.wrap(messageBytes));
+ // This is replacing the message by the new expanded version
+ encodedMessage = encodeAndCreateAMQPMessage(message);
+
+ }
+
+ }
+ }
+
private AMQPMessage encodeAndCreateAMQPMessage(MessageImpl message) {
NettyWritable encoded = new NettyWritable(Unpooled.buffer(1024));
message.encode(encoded);