You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/10/19 12:33:30 UTC
qpid-broker-j git commit: QPID-7832: Fix infinite recursion when
handling an AMQP1.0 message encoded in the version 0 format. Fixes regression
introduced by 660c206deb352aca3694a6b31f5f7cf6fca70533
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 331c7276a -> e9e0e74e7
QPID-7832: Fix infinite recursion when handling an AMQP1.0 message encoded in the version 0 format. Fixes regression introduced by 660c206deb352aca3694a6b31f5f7cf6fca70533
End to end test added.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/e9e0e74e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/e9e0e74e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/e9e0e74e
Branch: refs/heads/master
Commit: e9e0e74e7b23587fbc384aaf03250a3c3e0c7624
Parents: 331c727
Author: Keith Wall <kw...@apache.org>
Authored: Thu Oct 19 13:23:57 2017 +0100
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Oct 19 13:33:15 2017 +0100
----------------------------------------------------------------------
.../berkeleydb/AbstractBDBMessageStore.java | 7 +-
.../test-store/00000000.jdb | Bin 0 -> 535122 bytes
.../berkeleydb/BDBAMQP10V0UpgradeTest.java | 127 +++++++++++++++++++
.../qpid/server/store/StoredMemoryMessage.java | 4 +
.../apache/qpid/server/store/StoredMessage.java | 5 +
.../qpid/server/protocol/v1_0/Message_1_0.java | 4 +-
.../store/jdbc/AbstractJDBCMessageStore.java | 7 +-
7 files changed, 151 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
----------------------------------------------------------------------
diff --git a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index c871508..10906ce 100644
--- a/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -1023,7 +1023,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore
@Override
public synchronized QpidByteBuffer getContent(int offset, int length)
{
- return getContentAsByteBuffer().view(offset, length);
+ QpidByteBuffer contentAsByteBuffer = getContentAsByteBuffer();
+ if (length == Integer.MAX_VALUE)
+ {
+ length = contentAsByteBuffer.remaining();
+ }
+ return contentAsByteBuffer.view(offset, length);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb
----------------------------------------------------------------------
diff --git a/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb b/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb
new file mode 100644
index 0000000..a234247
Binary files /dev/null and b/bdbstore/src/test/resources/upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb differ
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
new file mode 100644
index 0000000..34c550c
--- /dev/null
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBAMQP10V0UpgradeTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.security.MessageDigest;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBVirtualHostNode;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TestBrokerConfiguration;
+import org.apache.qpid.util.FileUtils;
+
+/**
+ *
+ * The store was formed with an Qpid JMS Client 0.26 and Qpid Broker v6.1.4 configured to use a BDB virtualhostnode
+ * and provided store.
+ *
+ * byte[] content = new byte[256*1024];
+ * IntStream.range(0, content.length).forEachOrdered(i -> content[i] = (byte) (i % 256));
+ * BytesMessage message = session.createBytesMessage();
+ * message.writeBytes(content);
+ * message.setStringProperty("sha256hash", DatatypeConverter.printHexBinary(MessageDigest.getInstance("SHA-256").digest(content)));
+ * messageProducer.send(message);
+ *
+ */
+public class BDBAMQP10V0UpgradeTest extends QpidBrokerTestCase
+{
+ private static final int EXPECTED_MESSAGE_LENGTH = 256 * 1024;
+
+ private String _storeLocation;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ _storeLocation = Files.createTempDirectory("qpid-work-" + getClassQualifiedTestName() + "-bdb-store").toString();
+ TestBrokerConfiguration brokerConfiguration = getDefaultBrokerConfiguration();
+ brokerConfiguration.setObjectAttribute(VirtualHostNode.class, TestBrokerConfiguration.ENTRY_NAME_VIRTUAL_HOST, BDBVirtualHostNode.STORE_PATH, _storeLocation );
+
+ //Clear the two target directories if they exist.
+ File directory = new File(_storeLocation);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+ directory.mkdirs();
+
+ // copy store files
+ InputStream src = getClass().getClassLoader().getResourceAsStream("upgrade/bdbstore-v9-amqp10v0/test-store/00000000.jdb");
+ FileUtils.copy(src, new File(_storeLocation, "00000000.jdb"));
+
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ try
+ {
+ super.tearDown();
+ }
+ finally
+ {
+ FileUtils.delete(new File(_storeLocation), true);
+ }
+ }
+
+ public void testRecoverAmpqV0Message() throws Exception
+ {
+ Connection connection = getConnection();
+ connection.start();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = createTestQueue(session, "queue");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNotNull("Recovered message not received", message);
+ assertTrue(message instanceof BytesMessage);
+ BytesMessage bytesMessage = ((BytesMessage) message);
+
+ long length = bytesMessage.getBodyLength();
+ String expectedContentHash = message.getStringProperty("sha256hash");
+ byte[] content = new byte[(int) length];
+ bytesMessage.readBytes(content);
+
+ assertEquals("Unexpected content length", EXPECTED_MESSAGE_LENGTH, length);
+ assertNotNull("Message should carry expectedShaHash property", expectedContentHash);
+
+ String contentHash = computeContentHash(content);
+ assertEquals("Unexpected content hash", expectedContentHash, contentHash);
+ session.commit();
+ }
+
+ private String computeContentHash(final byte[] content) throws Exception
+ {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ byte[] hash = digest.digest(content);
+ return DatatypeConverter.printHexBinary(hash);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index f4de0d0..8538a95 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -83,6 +83,10 @@ public class StoredMemoryMessage<T extends StorableMessageMetaData> implements S
try (QpidByteBuffer combined = QpidByteBuffer.concatenate(_content))
{
+ if (length == Integer.MAX_VALUE)
+ {
+ length = combined.remaining();
+ }
return combined.view(offset, length);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
index 98b54bd..a1be172 100755
--- a/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/StoredMessage.java
@@ -28,6 +28,11 @@ public interface StoredMessage<M extends StorableMessageMetaData>
long getMessageNumber();
+ /**
+ * Returns length bytes of message content beginning from the given offset. Caller is responsible
+ * for the disposal of the returned buffer. If length is {@link Integer#MAX_VALUE}, length is not
+ * constrained.
+ */
QpidByteBuffer getContent(int offset, int length);
int getContentSize();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index e02f24a..f33dfe3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -181,7 +181,9 @@ public class Message_1_0 extends AbstractServerMessageImpl<Message_1_0, MessageM
try
{
List<EncodingRetainingSection<?>> sections;
- try (QpidByteBuffer allSectionsContent = super.getContent())
+ // The v0 message format put all sections within the content, so we need to read all the stored content
+ // not just #getSize()
+ try (QpidByteBuffer allSectionsContent = super.getContent(0, Integer.MAX_VALUE))
{
sections = sectionDecoder.parseAll(allSectionsContent);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/e9e0e74e/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
index 9c6cfa9..aeff70e 100644
--- a/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
+++ b/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/AbstractJDBCMessageStore.java
@@ -1435,7 +1435,12 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public synchronized QpidByteBuffer getContent(int offset, int length)
{
- return getContentAsByteBuffer().view(offset, length);
+ QpidByteBuffer contentAsByteBuffer = getContentAsByteBuffer();
+ if (length == Integer.MAX_VALUE)
+ {
+ length = contentAsByteBuffer.remaining();
+ }
+ return contentAsByteBuffer.view(offset, length);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org