You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/10/19 12:33:30 UTC

qpid-broker-j git commit: QPID-7832: Fix infinite recursion when handling an AMQP1.0 message encoded in the version 0 format. Fixes regression introduced by 660c206deb352aca3694a6b31f5f7cf6fca70533

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 331c7276a -> e9e0e74e7


QPID-7832: Fix infinite recursion when handling an AMQP1.0 message encoded in the version 0 format.  Fixes regression introduced by 660c206deb352aca3694a6b31f5f7cf6fca70533

End to end test added.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e9e0e74e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e9e0e74e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e9e0e74e

Branch: refs/heads/master
Commit: e9e0e74e7b23587fbc384aaf03250a3c3e0c7624
Parents: 331c727
Author: Keith Wall <kw...@apache.org>
Authored: Thu Oct 19 13:23:57 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Oct 19 13:33:15 2017 +0100

----------------------------------------------------------------------
 .../berkeleydb/AbstractBDBMessageStore.java     |   7 +-
 .../test-store/00000000.jdb                     | Bin 0 -> 535122 bytes
 .../berkeleydb/BDBAMQP10V0UpgradeTest.java      | 127 +++++++++++++++++++
 .../qpid/server/store/StoredMemoryMessage.java  |   4 +
 .../apache/qpid/server/store/StoredMessage.java |   5 +
 .../qpid/server/protocol/v1_0/Message_1_0.java  |   4 +-
 .../store/jdbc/AbstractJDBCMessageStore.java    |   7 +-
 7 files changed, 151 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index c871508..10906ce 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1023,7 +1023,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
         @Override
         public synchronized QpidByteBuffer getContent(int offset, int length)
         {
-            return getContentAsByteBuffer().view(offset, length);
+            QpidByteBuffer contentAsByteBuffer = getContentAsByteBuffer();
+            if (length == Integer.MAX_VALUE)
+            {
+                length = contentAsByteBuffer.remaining();
+            }
+            return contentAsByteBuffer.view(offset, length);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb
----------------------------------------------------------------------
diff --git a/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb b/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb
new file mode 100644
index 0000000..a234247
Binary files /dev/null and b/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb differ

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
new file mode 100644
index 0000000..34c550c
--- /dev/null
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.MessageDigest;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+/**
+ *
+ * The store was formed with an Qpid JMS Client 0.26 and Qpid Broker v6.1.4 configured to use a BDB virtualhostnode
+ * and provided store.
+ *
+ * byte[] content = new byte[256*1024];
+ * IntStream.range(0, content.length).forEachOrdered(i -> content[i] = (byte) (i % 256));
+ * BytesMessage message = session.createBytesMessage();
+ * message.writeBytes(content);
+ * message.setStringProperty("sha256hash", DatatypeConverter.printHexBinary(MessageDigest.getInstance("SHA-256").digest(content)));
+ * messageProducer.send(message);
+ *
+ */
+public class BDBAMQP10V0UpgradeTest extends QpidBrokerTestCase
+{
+    private static final int EXPECTED_MESSAGE_LENGTH = 256 * 1024;
+
+    private String _storeLocation;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        _storeLocation = Files.createTempDirectory("qpid-work-" + getClassQualifiedTestName() + "-bdb-store").toString();
+        TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
+        brokerConfiguration.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, BDBVirtualHostNode.STORE_PATH, _storeLocation );
+
+        //Clear the two target directories if they exist.
+        File directory = new File(_storeLocation);
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+        directory.mkdirs();
+
+        // copy store files
+        InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb");
+        FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
+
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception
+    {
+        try
+        {
+            super.tearDown();
+        }
+        finally
+        {
+            FileUtils.delete(new File(_storeLocation), true);
+        }
+    }
+
+    public void testRecoverAmpqV0Message() throws Exception
+    {
+        Connection connection = getConnection();
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = createTestQueue(session, "queue");
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        Message message = consumer.receive(getReceiveTimeout());
+        assertNotNull("Recovered message not received", message);
+        assertTrue(message instanceof BytesMessage);
+        BytesMessage bytesMessage = ((BytesMessage) message);
+
+        long length = bytesMessage.getBodyLength();
+        String expectedContentHash = message.getStringProperty("sha256hash");
+        byte[] content = new byte[(int) length];
+        bytesMessage.readBytes(content);
+
+        assertEquals("Unexpected content length", EXPECTED_MESSAGE_LENGTH, length);
+        assertNotNull("Message should carry expectedShaHash property", expectedContentHash);
+
+        String contentHash = computeContentHash(content);
+        assertEquals("Unexpected content hash", expectedContentHash, contentHash);
+        session.commit();
+    }
+
+    private String computeContentHash(final byte[] content) throws Exception
+    {
+        MessageDigest digest = MessageDigest.getInstance("SHA-256");
+        byte[] hash = digest.digest(content);
+        return DatatypeConverter.printHexBinary(hash);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index f4de0d0..8538a95 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -83,6 +83,10 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
 
         try (QpidByteBuffer combined = QpidByteBuffer.concatenate(_content))
         {
+            if (length == Integer.MAX_VALUE)
+            {
+                length = combined.remaining();
+            }
             return combined.view(offset, length);
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 98b54bd..a1be172 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -28,6 +28,11 @@ public interface StoredMessage<M extends StorableMessageMetaData>
 
     long getMessageNumber();
 
+    /**
+     * Returns length bytes of message content beginning from the given offset.  Caller is responsible
+     * for the disposal of the returned buffer.  If length is {@link Integer#MAX_VALUE}, length is not
+     * constrained.
+     */
     QpidByteBuffer getContent(int offset, int length);
 
     int getContentSize();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index e02f24a..f33dfe3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -181,7 +181,9 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
             try
             {
                 List<EncodingRetainingSection<?>> sections;
-                try (QpidByteBuffer allSectionsContent = super.getContent())
+                // The v0 message format put all sections within the content, so we need to read all the stored content
+                // not just #getSize()
+                try (QpidByteBuffer allSectionsContent = super.getContent(0, Integer.MAX_VALUE))
                 {
                     sections = sectionDecoder.parseAll(allSectionsContent);
                 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 9c6cfa9..aeff70e 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1435,7 +1435,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
         @Override
         public synchronized QpidByteBuffer getContent(int offset, int length)
         {
-            return getContentAsByteBuffer().view(offset, length);
+            QpidByteBuffer contentAsByteBuffer = getContentAsByteBuffer();
+            if (length == Integer.MAX_VALUE)
+            {
+                length = contentAsByteBuffer.remaining();
+            }
+            return contentAsByteBuffer.view(offset, length);
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org