You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/06/12 11:14:15 UTC
qpid-broker-j git commit: QPID-7648: [Java Broker] Reject AMQP 1.0
durable messages if no persistent store is configured.
Repository: qpid-broker-j
Updated Branches:
refs/heads/master b36c7180f -> 82ef9deaa
QPID-7648: [Java Broker] Reject AMQP 1.0 durable messages if no persistent store is configured.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/82ef9dea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/82ef9dea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/82ef9dea
Branch: refs/heads/master
Commit: 82ef9deaa97508998fc0db3e56969242778de4a3
Parents: b36c718
Author: Lorenz Quack <lq...@apache.org>
Authored: Mon Jun 12 12:10:43 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Mon Jun 12 12:10:43 2017 +0100
----------------------------------------------------------------------
.../qpid/server/store/MemoryMessageStore.java | 2 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 25 +++-
pom.xml | 7 +-
.../tests/protocol/v1_0/MessageEncoder.java | 20 +--
.../protocol/v1_0/messaging/TransferTest.java | 142 +++++++++++++++++++
5 files changed, 184 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 0da7db7..d9992ec 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -321,7 +321,7 @@ public class MemoryMessageStore implements MessageStore
@Override
public boolean isPersistent()
{
- return false;
+ return Boolean.parseBoolean(System.getProperty("qpid.tests.mms.messagestore.persistence", "false"));
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 331db41..e33bbb6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
@@ -237,9 +238,29 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
session.getAMQPConnection()
.checkAuthorizedMessagePrincipal(serverMessage.getMessageHeader().getUserId());
- Outcome outcome = getReceivingDestination().send(serverMessage, transaction,
- session.getSecurityToken());
+ Outcome outcome;
Source source = getSource();
+ if (serverMessage.isPersistent() && !getAddressSpace().getMessageStore().isPersistent())
+ {
+ final Error preconditionFailedError = new Error(AmqpError.PRECONDITION_FAILED,
+ "Non-durable message store cannot accept durable message.");
+ if (source.getOutcomes() != null && Arrays.asList(source.getOutcomes())
+ .contains(Rejected.REJECTED_SYMBOL))
+ {
+ final Rejected rejected = new Rejected();
+ rejected.setError(preconditionFailedError);
+ outcome = rejected;
+ }
+ else
+ {
+ return preconditionFailedError;
+ }
+ }
+ else
+ {
+ outcome = getReceivingDestination().send(serverMessage, transaction,
+ session.getSecurityToken());
+ }
DeliveryState resultantState;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 45f6fa1..612ba6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,6 +112,7 @@
<profile.test_receive_timeout>1000</profile.test_receive_timeout>
<profile.java.naming.factory.initial>org.apache.qpid.jndi.PropertiesFileInitialContextFactory</profile.java.naming.factory.initial>
<profile.java.naming.provider.url>test-profiles${file.separator}test-provider.properties</profile.java.naming.provider.url>
+ <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
<dollar.sign>$</dollar.sign>
<at.sign>@</at.sign>
@@ -531,6 +532,7 @@
<test.output.dir>${test.output.dir}</test.output.dir>
<broker.clean.between.tests>true</broker.clean.between.tests>
<qpid.test_receive_timeout>${profile.test_receive_timeout}</qpid.test_receive_timeout>
+ <qpid.tests.mms.messagestore.persistence>${profile.qpid.tests.mms.messagestore.persistence}</qpid.tests.mms.messagestore.persistence>
<java.naming.factory.initial>${profile.java.naming.factory.initial}</java.naming.factory.initial>
<java.naming.provider.url>${profile.java.naming.provider.url}</java.naming.provider.url>
</systemPropertyVariables>
@@ -732,6 +734,7 @@
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+ <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
</properties>
</profile>
@@ -751,6 +754,7 @@
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+ <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
</properties>
</profile>
@@ -770,6 +774,7 @@
<profile.broker.persistent>false</profile.broker.persistent>
<profile.virtualhostnode.type>Memory</profile.virtualhostnode.type>
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+ <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
</properties>
</profile>
@@ -965,7 +970,7 @@
<profile.virtualhostnode.context.blueprint>{"type":"ProvidedStore","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
<profile.java.naming.factory.initial>org.apache.qpid.jms.jndi.JmsInitialContextFactory</profile.java.naming.factory.initial>
<profile.java.naming.provider.url>test-profiles${file.separator}test-provider-1-0.properties</profile.java.naming.provider.url>
-
+ <profile.qpid.tests.mms.messagestore.persistence>true</profile.qpid.tests.mms.messagestore.persistence>
</properties>
</profile>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
index 78ffe9d..0c77ab1 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageEncoder.java
@@ -25,29 +25,33 @@ import java.util.LinkedList;
import java.util.List;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
-import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
public class MessageEncoder
{
- private static final AMQPDescribedTypeRegistry
- AMQP_DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer();
+ private Header _header;
private List<String> _data = new LinkedList<>();
- private SectionEncoder _encoder = new SectionEncoderImpl(AMQP_DESCRIBED_TYPE_REGISTRY);
public void addData(final String data)
{
_data.add(data);
}
+ public void setHeader(Header header)
+ {
+ _header = header;
+ }
+
public List<QpidByteBuffer> getPayload()
{
List<QpidByteBuffer> payload = new ArrayList<>();
+ if (_header != null)
+ {
+ payload.addAll(_header.createEncodingRetainingSection().getEncodedForm());
+ }
+
if (_data.isEmpty())
{
throw new IllegalStateException("Message should have at least one data section");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/82ef9dea/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index c099e20..8d93002 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -32,15 +32,19 @@ import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -66,14 +70,31 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
public class TransferTest extends ProtocolTestBase
{
private InetSocketAddress _brokerAddress;
+ private String _originalMmsMessageStorePersistence;
@Before
public void setUp()
{
+ _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+ System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
_brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
}
+ @After
+ public void tearDown()
+ {
+ if (_originalMmsMessageStorePersistence != null)
+ {
+ System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+ }
+ else
+ {
+ System.clearProperty("qpid.tests.mms.messagestore.persistence");
+ }
+ }
+
@Test
@SpecificationTest(section = "1.3.4",
description = "Transfer without mandatory fields should result in a decoding error.")
@@ -278,6 +299,8 @@ public class TransferTest extends ProtocolTestBase
MessageEncoder messageEncoder = new MessageEncoder();
messageEncoder.addData("foo");
Transfer transfer = new Transfer();
+ transfer.setDeliveryId(UnsignedInteger.ONE);
+ transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
transfer.setHandle(linkHandle);
transfer.setPayload(messageEncoder.getPayload());
transfer.setSettled(Boolean.TRUE);
@@ -324,4 +347,123 @@ public class TransferTest extends ProtocolTestBase
assertThat(closeResponse.getFrameBody(), is(instanceOf(Close.class)));
}
}
+
+ @Test
+ @SpecificationTest(section = "3.2.1",
+ description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and "
+ + "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages "
+ + "where the durable header is set to true: if the source allows the rejected outcome then the "
+ + "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+ + "detached by the receiver with the same error.")
+ public void durableTransferWithRejectedOutcome() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ {
+ final Attach attach = new Attach();
+ attach.setName("testLink");
+ attach.setRole(Role.SENDER);
+ final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+ attach.setHandle(linkHandle);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ Target target = new Target();
+ target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+ attach.setTarget(target);
+ final Source source = new Source();
+ source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+ attach.setSource(source);
+ transport.doAttachSendingLink(attach);
+
+ MessageEncoder messageEncoder = new MessageEncoder();
+ final Header header = new Header();
+ header.setDurable(true);
+ messageEncoder.setHeader(header);
+ messageEncoder.addData("test message data.");
+ Transfer transfer = new Transfer();
+ transfer.setDeliveryId(UnsignedInteger.ONE);
+ transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+ transfer.setHandle(linkHandle);
+ transfer.setPayload(messageEncoder.getPayload());
+
+ transport.sendPerformative(transfer);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ if (getBrokerAdmin().supportsRestart())
+ {
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+ final Disposition receivedDisposition = (Disposition) response.getFrameBody();
+ assertThat(receivedDisposition.getSettled(), is(true));
+ assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
+ assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL));
+ }
+ else
+ {
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+ final Disposition receivedDisposition = (Disposition) response.getFrameBody();
+ assertThat(receivedDisposition.getSettled(), is(true));
+ assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
+ assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Rejected.REJECTED_SYMBOL));
+ }
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "3.2.1",
+ description = "Durable messages MUST NOT be lost even if an intermediary is unexpectedly terminated and "
+ + "restarted. A target which is not capable of fulfilling this guarantee MUST NOT accept messages "
+ + "where the durable header is set to true: if the source allows the rejected outcome then the "
+ + "message SHOULD be rejected with the precondition-failed error, otherwise the link MUST be "
+ + "detached by the receiver with the same error.")
+ public void durableTransferWithoutRejectedOutcome() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ {
+ final Attach attach = new Attach();
+ attach.setName("testLink");
+ attach.setRole(Role.SENDER);
+ final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+ attach.setHandle(linkHandle);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ Target target = new Target();
+ target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+ attach.setTarget(target);
+ final Source source = new Source();
+ source.setOutcomes(Accepted.ACCEPTED_SYMBOL);
+ attach.setSource(source);
+ transport.doAttachSendingLink(attach);
+
+ MessageEncoder messageEncoder = new MessageEncoder();
+ final Header header = new Header();
+ header.setDurable(true);
+ messageEncoder.setHeader(header);
+ messageEncoder.addData("test message data.");
+ Transfer transfer = new Transfer();
+ transfer.setDeliveryId(UnsignedInteger.ONE);
+ transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+ transfer.setHandle(linkHandle);
+ transfer.setPayload(messageEncoder.getPayload());
+
+ transport.sendPerformative(transfer);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ if (getBrokerAdmin().supportsRestart())
+ {
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+ final Disposition receivedDisposition = (Disposition) response.getFrameBody();
+ assertThat(receivedDisposition.getSettled(), is(true));
+ assertThat(receivedDisposition.getState(), is(instanceOf(Outcome.class)));
+ assertThat(((Outcome) receivedDisposition.getState()).getSymbol(), is(Accepted.ACCEPTED_SYMBOL));
+ }
+ else
+ {
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Detach.class)));
+ final Detach receivedDetach = (Detach) response.getFrameBody();
+ assertThat(receivedDetach.getError(), is(notNullValue()));
+ assertThat(receivedDetach.getError().getCondition(), is(AmqpError.PRECONDITION_FAILED));
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org