You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/04/13 17:23:33 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6244 -
reset ActiveMQBytesMessage 'compressed' flag after restoring compressed
content
Repository: activemq
Updated Branches:
refs/heads/activemq-5.13.x 3e2bd6d2a -> 7894d8a9a
https://issues.apache.org/jira/browse/AMQ-6244 - reset ActiveMQBytesMessage 'compressed' flag after restoring compressed content
(cherry picked from commit 7a61718e0255728b7a822b25fcf75a030be14359)
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7894d8a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7894d8a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7894d8a9
Branch: refs/heads/activemq-5.13.x
Commit: 7894d8a9a1f0465c0ff26360ac364b4dc797d254
Parents: 3e2bd6d
Author: Brian D. Johnson <br...@thejohnsonfamily.name>
Authored: Mon Apr 11 20:18:26 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Apr 13 15:22:32 2016 +0000
----------------------------------------------------------------------
.../activemq/command/ActiveMQBytesMessage.java | 27 +--
.../org/apache/activemq/bugs/AMQ6244Test.java | 179 +++++++++++++++++++
2 files changed, 193 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7894d8a9/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 8806028..f0aeb81 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -842,8 +842,9 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
if (this.content != null && this.content.length > 0) {
try {
ByteSequence toRestore = this.content;
- if (compressed) {
+ if (isCompressed()) {
toRestore = new ByteSequence(decompress(this.content));
+ compressed = false;
}
this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
@@ -866,20 +867,20 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
checkWriteOnlyBody();
if (dataIn == null) {
try {
- ByteSequence data = getContent();
- if (data == null) {
- data = new ByteSequence(new byte[] {}, 0, 0);
- }
- InputStream is = new ByteArrayInputStream(data);
- if (isCompressed()) {
- if (data.length != 0) {
- is = new ByteArrayInputStream(decompress(data));
+ ByteSequence data = getContent();
+ if (data == null) {
+ data = new ByteSequence(new byte[] {}, 0, 0);
+ }
+ InputStream is = new ByteArrayInputStream(data);
+ if (isCompressed()) {
+ if (data.length != 0) {
+ is = new ByteArrayInputStream(decompress(data));
+ }
+ } else {
+ length = data.getLength();
}
- } else {
- length = data.getLength();
- }
- dataIn = new DataInputStream(is);
+ dataIn = new DataInputStream(is);
} catch (IOException ioe) {
throw JMSExceptionSupport.create(ioe);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7894d8a9/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java
new file mode 100644
index 0000000..bfd2e65
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import com.google.common.base.Throwables;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.junit.EmbeddedActiveMQBroker;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.zip.DataFormatException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+public class AMQ6244Test {
+
+ public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
+
+ @Rule
+ public TestName name = new TestName();
+
+ @Rule
+ public EmbeddedActiveMQBroker brokerRule = new EmbeddedActiveMQBroker();
+
+ public AMQ6244Test() {
+ brokerRule.setBrokerName(this.getClass().getName());
+ }
+
+ @Test
+ public void bytesMsgCompressedFlagTest() throws Exception {
+ final ActiveMQConnection compressionEnabledConnection = createConnection(brokerRule.getVmURL(), true);
+ final ActiveMQConnection compressionDisabledConnection = createConnection(brokerRule.getVmURL(), false);
+
+ // Consumer (compression=false)
+ final Session consumerSession = compressionDisabledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = consumerSession.createQueue(name.getMethodName());
+ final MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+ // Producer (compression=false)
+ final Session compressionDisabledProducerSession = compressionDisabledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer compressionDisabledProducer = compressionDisabledProducerSession.createProducer(destination);
+
+ // Producer (compression=true)
+ final Session compressionEnabledProducerSession = compressionEnabledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer compressionEnabledProducer = compressionEnabledProducerSession.createProducer(destination);
+
+ try {
+ /*
+ * Publish a BytesMessage on the compressed connection
+ */
+ final ActiveMQBytesMessage originalCompressedMsg = (ActiveMQBytesMessage) compressionEnabledProducerSession.createBytesMessage();
+ originalCompressedMsg.writeBytes(ORIG_MSG_CONTENT);
+ Assert.assertFalse(originalCompressedMsg.isReadOnlyBody());
+
+ // send first message
+ compressionEnabledProducer.send(originalCompressedMsg);
+ Assert.assertEquals(
+ "Once sent, the Message's 'compressed' flag should match the 'useCompression' flag on the Producer's Connection",
+ compressionEnabledConnection.isUseCompression(), originalCompressedMsg.isCompressed());
+
+ /*
+ * Consume the compressed message and resend it decompressed
+ */
+ final ActiveMQBytesMessage compressedMsg = receiveMsg(consumer, originalCompressedMsg);
+ validateMsgContent(compressedMsg);
+
+ // make message writable so the client can reuse it
+ makeWritable(compressedMsg);
+ compressedMsg.setStringProperty(this.getClass().getName(), "test");
+ compressionDisabledProducer.send(compressedMsg);
+
+ /*
+ * AMQ-6244 ERROR STATE 1: Produced Message is marked 'compressed' when its contents are not compressed
+ */
+ Assert.assertEquals(
+ "AMQ-6244 Error State Achieved: Produced Message's 'compressed' flag is enabled after message is published on a connection with 'useCompression=false'",
+ compressionDisabledConnection.isUseCompression(), compressedMsg.isCompressed());
+
+ /*
+ * AMQ-6244 ERROR STATE 2: Consumer cannot handle Message marked 'compressed' when its contents are not compressed
+ */
+ try {
+ final ActiveMQBytesMessage uncompressedMsg = receiveMsg(consumer, compressedMsg);
+ validateMsgContent(uncompressedMsg);
+ } catch (JMSException jmsE) {
+ final Throwable rootCause = Throwables.getRootCause(jmsE);
+
+ if (rootCause instanceof DataFormatException || rootCause instanceof NegativeArraySizeException) {
+ final StringWriter sw = new StringWriter();
+ final PrintWriter pw = new PrintWriter(sw);
+
+ jmsE.printStackTrace(pw);
+
+ Assert.fail(
+ "AMQ-6244 Error State Achieved: Attempted to decompress BytesMessage contents that are not compressed\n" + sw
+ .toString());
+ } else {
+ throw jmsE;
+ }
+ }
+ } finally {
+ compressionEnabledProducerSession.close();
+ compressionEnabledConnection.close();
+ consumerSession.close();
+ compressionDisabledProducerSession.close();
+ compressionDisabledConnection.close();
+ }
+ }
+
+ private ActiveMQBytesMessage receiveMsg(final MessageConsumer consumer, final ActiveMQMessage sentMessage) throws JMSException {
+ // receive the message
+ final ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumer.receive();
+ Assert.assertNotNull(message);
+ Assert.assertTrue("Consumed Message should be read-only", message.isReadOnlyBody());
+ Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag",
+ sentMessage.isCompressed(), message.isCompressed());
+
+ return message;
+ }
+
+ private void validateMsgContent(final ActiveMQBytesMessage message) throws JMSException {
+ // ensure consumed message content matches what was originally set
+ final byte[] msgContent = new byte[(int) message.getBodyLength()];
+ message.readBytes(msgContent);
+
+ Assert.assertTrue("Consumed Message content should match the original Message content",
+ Arrays.equals(ORIG_MSG_CONTENT, msgContent));
+ }
+
+ protected static ActiveMQConnection createConnection(final String URL, final boolean useCompression) throws Exception {
+ final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
+ factory.setUseCompression(useCompression);
+ Connection connection = factory.createConnection();
+ connection.start();
+ return (ActiveMQConnection) connection;
+ }
+
+ protected static byte[] randomByteArray() {
+ final Random random = new Random();
+ final byte[] byteArray = new byte[random.nextInt(10 * 1024)];
+ random.nextBytes(byteArray);
+
+ return byteArray;
+ }
+
+ protected static void makeWritable(final ActiveMQMessage message) {
+ message.setReadOnlyBody(false);
+ message.setReadOnlyProperties(false);
+ }
+}