You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2014/12/18 04:17:41 UTC

[8/8] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5381

https://issues.apache.org/jira/browse/AMQ-5381

Apply fix and test for error when restoring old content and treating it
as compressed when it was not.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e3d218a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e3d218a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e3d218a9

Branch: refs/heads/activemq-5.10.x
Commit: e3d218a97ab3f502366b7cc1fc62fc212a5591c5
Parents: c52c4ed
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 21 15:52:10 2014 -0400
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 17 22:00:52 2014 -0500

----------------------------------------------------------------------
 .../activemq/command/ActiveMQBytesMessage.java  |  12 +-
 .../org/apache/activemq/bugs/AMQ5381Test.java   | 182 +++++++++++++++++++
 2 files changed, 185 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e3d218a9/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
index 65e1036..8806028 100755
--- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQBytesMessage.java
@@ -19,15 +19,12 @@ package org.apache.activemq.command;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
-import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
 import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -131,7 +128,9 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
                 dataOut.close();
                 ByteSequence bs = bytesOut.toByteSequence();
                 setContent(bs);
-                if (compressed) {
+
+                ActiveMQConnection connection = getConnection();
+                if (connection != null && connection.isUseCompression()) {
                     doCompress();
                 }
             } catch (IOException ioe) {
@@ -834,11 +833,6 @@ public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessag
             this.dataOut = new DataOutputStream(os);
         }
 
-        ActiveMQConnection connection = getConnection();
-        if (connection != null && connection.isUseCompression()) {
-            compressed = true;
-        }
-
         restoreOldContent();
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/e3d218a9/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
new file mode 100644
index 0000000..ff10b0d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5381Test.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Arrays;
+import java.util.Random;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class AMQ5381Test {
+
+    public static final byte[] ORIG_MSG_CONTENT = randomByteArray();
+    public static final String AMQ5381_EXCEPTION_MESSAGE = "java.util.zip.DataFormatException: incorrect header check";
+
+    private BrokerService brokerService;
+    private String brokerURI;
+
+    @Rule public TestName name = new TestName();
+
+    @Before
+    public void startBroker() throws Exception {
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setUseJmx(false);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        brokerURI = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    private ActiveMQConnection createConnection(boolean useCompression) throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURI);
+        factory.setUseCompression(useCompression);
+        Connection connection = factory.createConnection();
+        connection.start();
+        return (ActiveMQConnection) connection;
+    }
+
+    @Test
+    public void amq5381Test() throws Exception {
+
+        // Consumer Configured for (useCompression=true)
+        final ActiveMQConnection consumerConnection = createConnection(true);
+        final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue consumerQueue = consumerSession.createQueue(name.getMethodName());
+        final MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
+
+        // Producer Configured for (useCompression=false)
+        final ActiveMQConnection producerConnection = createConnection(false);
+        final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        final Queue producerQueue = producerSession.createQueue(name.getMethodName());
+
+        try {
+
+            final ActiveMQBytesMessage messageProduced = (ActiveMQBytesMessage) producerSession.createBytesMessage();
+            messageProduced.writeBytes(ORIG_MSG_CONTENT);
+            Assert.assertFalse(messageProduced.isReadOnlyBody());
+
+            Assert.assertFalse(
+                "Produced Message's 'compressed' flag should remain false until the message is sent (where it will be compressed, if necessary)",
+                messageProduced.isCompressed());
+
+            final MessageProducer producer = producerSession.createProducer(null);
+            producer.send(producerQueue, messageProduced);
+
+            Assert.assertEquals("Once sent, the produced Message's 'compressed' flag should match its Connection's 'useCompression' flag",
+                producerConnection.isUseCompression(), messageProduced.isCompressed());
+
+            final ActiveMQBytesMessage messageConsumed = (ActiveMQBytesMessage) consumer.receive();
+            Assert.assertNotNull(messageConsumed);
+            Assert.assertTrue("Consumed Message should be read-only", messageConsumed.isReadOnlyBody());
+            Assert.assertEquals("Consumed Message's 'compressed' flag should match the produced Message's 'compressed' flag",
+                                messageProduced.isCompressed(), messageConsumed.isCompressed());
+
+            // ensure consumed message content matches what was originally set
+            final byte[] consumedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
+            messageConsumed.readBytes(consumedMsgContent);
+
+            Assert.assertTrue("Consumed Message content should match the original Message content", Arrays.equals(ORIG_MSG_CONTENT, consumedMsgContent));
+
+            // make message writable so the consumer can modify and reuse it
+            makeWritable(messageConsumed);
+
+            // modify message, attempt to trigger DataFormatException due
+            // to old incorrect compression logic
+            try {
+                messageConsumed.setStringProperty(this.getClass().getName(), "test");
+            } catch (JMSException jmsE) {
+                if (AMQ5381_EXCEPTION_MESSAGE.equals(jmsE.getMessage())) {
+                    StringWriter sw = new StringWriter();
+                    PrintWriter pw = new PrintWriter(sw);
+                    jmsE.printStackTrace(pw);
+
+                    Assert.fail("AMQ5381 Error State Achieved: attempted to decompress BytesMessage contents that are not compressed\n" + sw.toString());
+                } else {
+                    throw jmsE;
+                }
+            }
+
+            Assert.assertEquals(
+                "The consumed Message's 'compressed' flag should still match the produced Message's 'compressed' flag after it has been made writable",
+                messageProduced.isCompressed(), messageConsumed.isCompressed());
+
+            // simulate re-publishing message
+            simulatePublish(messageConsumed);
+
+            // ensure consumed message content matches what was originally set
+            final byte[] modifiedMsgContent = new byte[(int) messageConsumed.getBodyLength()];
+            messageConsumed.readBytes(modifiedMsgContent);
+
+            Assert.assertTrue(
+                "After the message properties are modified and it is re-published, its message content should still match the original message content",
+                Arrays.equals(ORIG_MSG_CONTENT, modifiedMsgContent));
+        } finally {
+            producerSession.close();
+            producerConnection.close();
+            consumerSession.close();
+            consumerConnection.close();
+        }
+    }
+
+    protected static final int MAX_RANDOM_BYTE_ARRAY_SIZE_KB = 128;
+
+    protected static byte[] randomByteArray() {
+        final Random random = new Random();
+        final byte[] byteArray = new byte[random.nextInt(MAX_RANDOM_BYTE_ARRAY_SIZE_KB * 1024)];
+        random.nextBytes(byteArray);
+
+        return byteArray;
+    }
+
+    protected static void makeWritable(final ActiveMQMessage message) {
+        message.setReadOnlyBody(false);
+        message.setReadOnlyProperties(false);
+    }
+
+    protected static void simulatePublish(final ActiveMQBytesMessage message) throws JMSException {
+        message.reset();
+        message.onSend();
+    }
+}