You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2016/04/13 17:23:33 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6244 - reset ActiveMQBytesMessage 'compressed' flag after restoring compressed content

Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 3e2bd6d2a -> 7894d8a9a


https://issues.apache.org/jira/browse/AMQ-6244 - reset ActiveMQBytesMessage 'compressed' flag after restoring compressed content

(cherry picked from commit 7a61718e0255728b7a822b25fcf75a030be14359)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7894d8a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7894d8a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7894d8a9

Branch: refs/heads/activemq-5.13.x
Commit: 7894d8a9a1f0465c0ff26360ac364b4dc797d254
Parents: 3e2bd6d
Author: Brian D. Johnson <br...@thejohnsonfamily.name>
Authored: Mon Apr 11 20:18:26 2016 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Wed Apr 13 15:22:32 2016 +0000

----------------------------------------------------------------------
 .../activemq/command/ActiveMQBytesMessage.java  |  27 +--
 .../org/apache/activemq/bugs/AMQ6244Test.java   | 179 +++++++++++++++++++
 2 files changed, 193 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7894d8a9/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 8806028..f0aeb81 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -842,8 +842,9 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
         if (this.content != null && this.content.length > 0) {
             try {
                 ByteSequence toRestore = this.content;
-                if (compressed) {
+                if (isCompressed()) {
                     toRestore = new ByteSequence(decompress(this.content));
+                    compressed = false;
                 }
 
                 this.dataOut.write(toRestore.getData(), toRestore.getOffset(), toRestore.getLength());
@@ -866,20 +867,20 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
         checkWriteOnlyBody();
         if (dataIn == null) {
             try {
-            ByteSequence data = getContent();
-            if (data == null) {
-                data = new ByteSequence(new byte[] {}, 0, 0);
-            }
-            InputStream is = new ByteArrayInputStream(data);
-            if (isCompressed()) {
-                if (data.length != 0) {
-                    is = new ByteArrayInputStream(decompress(data));
+                ByteSequence data = getContent();
+                if (data == null) {
+                    data = new ByteSequence(new byte[] {}, 0, 0);
+                }
+                InputStream is = new ByteArrayInputStream(data);
+                if (isCompressed()) {
+                    if (data.length != 0) {
+                        is = new ByteArrayInputStream(decompress(data));
+                    }
+                } else {
+                    length = data.getLength();
                 }
-            } else {
-                length = data.getLength();
-            }
 
-            dataIn = new DataInputStream(is);
+                dataIn = new DataInputStream(is);
             } catch (IOException ioe) {
                 throw JMSExceptionSupport.create(ioe);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7894d8a9/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java
new file mode 100644
index 0000000..bfd2e65
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6244Test.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import com.google.common.base.Throwables;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.junit.EmbeddedActiveMQBroker;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.zip.DataFormatException;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+public class AMQ6244Test {
+
+    public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
+
+    @Rule
+    public TestName name = new TestName();
+
+    @Rule
+    public EmbeddedActiveMQBroker brokerRule = new EmbeddedActiveMQBroker();
+
+    public AMQ6244Test() {
+        brokerRule.setBrokerName(this.getClass().getName());
+    }
+
+    @Test
+    public void bytesMsgCompressedFlagTest() throws Exception {
+        final ActiveMQConnection compressionEnabledConnection = createConnection(brokerRule.getVmURL(), true);
+        final ActiveMQConnection compressionDisabledConnection = createConnection(brokerRule.getVmURL(), false);
+
+        // Consumer (compression=false)
+        final Session consumerSession = compressionDisabledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue destination = consumerSession.createQueue(name.getMethodName());
+        final MessageConsumer consumer = consumerSession.createConsumer(destination);
+
+        // Producer (compression=false)
+        final Session compressionDisabledProducerSession = compressionDisabledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageProducer compressionDisabledProducer = compressionDisabledProducerSession.createProducer(destination);
+
+        // Producer (compression=true)
+        final Session compressionEnabledProducerSession = compressionEnabledConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final MessageProducer compressionEnabledProducer = compressionEnabledProducerSession.createProducer(destination);
+
+        try {
+            /*
+             * Publish a BytesMessage on the compressed connection
+             */
+            final ActiveMQBytesMessage originalCompressedMsg = (ActiveMQBytesMessage) compressionEnabledProducerSession.createBytesMessage();
+            originalCompressedMsg.writeBytes(ORIG_MSG_CONTENT);
+            Assert.assertFalse(originalCompressedMsg.isReadOnlyBody());
+
+            // send first message
+            compressionEnabledProducer.send(originalCompressedMsg);
+            Assert.assertEquals(
+                    "Once sent, the Message's 'compressed' flag should match the 'useCompression' flag on the Producer's Connection",
+                    compressionEnabledConnection.isUseCompression(), originalCompressedMsg.isCompressed());
+
+            /*
+             * Consume the compressed message and resend it decompressed
+             */
+            final ActiveMQBytesMessage compressedMsg = receiveMsg(consumer, originalCompressedMsg);
+            validateMsgContent(compressedMsg);
+
+            // make message writable so the client can reuse it
+            makeWritable(compressedMsg);
+            compressedMsg.setStringProperty(this.getClass().getName(), "test");
+            compressionDisabledProducer.send(compressedMsg);
+
+            /*
+             * AMQ-6244 ERROR STATE 1: Produced Message is marked 'compressed' when its contents are not compressed
+             */
+            Assert.assertEquals(
+                    "AMQ-6244 Error State Achieved: Produced Message's 'compressed' flag is enabled after message is published on a connection with 'useCompression=false'",
+                    compressionDisabledConnection.isUseCompression(), compressedMsg.isCompressed());
+
+            /*
+             * AMQ-6244 ERROR STATE 2: Consumer cannot handle Message marked 'compressed' when its contents are not compressed
+             */
+            try {
+                final ActiveMQBytesMessage uncompressedMsg = receiveMsg(consumer, compressedMsg);
+                validateMsgContent(uncompressedMsg);
+            } catch (JMSException jmsE) {
+                final Throwable rootCause = Throwables.getRootCause(jmsE);
+
+                if (rootCause instanceof DataFormatException || rootCause instanceof NegativeArraySizeException) {
+                    final StringWriter sw = new StringWriter();
+                    final PrintWriter pw = new PrintWriter(sw);
+
+                    jmsE.printStackTrace(pw);
+
+                    Assert.fail(
+                            "AMQ-6244 Error State Achieved: Attempted to decompress BytesMessage contents that are not compressed\n" + sw
+                                    .toString());
+                } else {
+                    throw jmsE;
+                }
+            }
+        } finally {
+            compressionEnabledProducerSession.close();
+            compressionEnabledConnection.close();
+            consumerSession.close();
+            compressionDisabledProducerSession.close();
+            compressionDisabledConnection.close();
+        }
+    }
+
+    private ActiveMQBytesMessage receiveMsg(final MessageConsumer consumer, final ActiveMQMessage sentMessage) throws JMSException {
+        // receive the message
+        final ActiveMQBytesMessage message = (ActiveMQBytesMessage) consumer.receive();
+        Assert.assertNotNull(message);
+        Assert.assertTrue("Consumed Message should be read-only", message.isReadOnlyBody());
+        Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag",
+                            sentMessage.isCompressed(), message.isCompressed());
+
+        return message;
+    }
+
+    private void validateMsgContent(final ActiveMQBytesMessage message) throws JMSException {
+        // ensure consumed message content matches what was originally set
+        final byte[] msgContent = new byte[(int) message.getBodyLength()];
+        message.readBytes(msgContent);
+
+        Assert.assertTrue("Consumed Message content should match the original Message content",
+                          Arrays.equals(ORIG_MSG_CONTENT, msgContent));
+    }
+
+    protected static ActiveMQConnection createConnection(final String URL, final boolean useCompression) throws Exception {
+        final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
+        factory.setUseCompression(useCompression);
+        Connection connection = factory.createConnection();
+        connection.start();
+        return (ActiveMQConnection) connection;
+    }
+
+    protected static byte[] randomByteArray() {
+        final Random random = new Random();
+        final byte[] byteArray = new byte[random.nextInt(10 * 1024)];
+        random.nextBytes(byteArray);
+
+        return byteArray;
+    }
+
+    protected static void makeWritable(final ActiveMQMessage message) {
+        message.setReadOnlyBody(false);
+        message.setReadOnlyProperties(false);
+    }
+}