You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/12 07:13:37 UTC
[1/2] qpid-broker-j git commit: QPID-8060: [Broker-J] [AMQP
0-8..0-9-1] Remove redundant comment left by this last commit
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 79aecbce0 -> 7b4e3e8d5
QPID-8060: [Broker-J] [AMQP 0-8..0-9-1] Remove redundant comment left by this last commit
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7640cfb3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7640cfb3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7640cfb3
Branch: refs/heads/master
Commit: 7640cfb3a9183905f534195d24bc848b2aea5cbc
Parents: 79aecbc
Author: Keith Wall <kw...@apache.org>
Authored: Mon Dec 11 18:12:45 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Dec 11 18:12:56 2017 +0000
----------------------------------------------------------------------
.../test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7640cfb3/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index 9a52a45..cdb2dd5 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -833,7 +833,6 @@ public class QueueTest extends BrokerAdminUsingTestBase
.declareName(BrokerAdmin.TEST_QUEUE_NAME)
.declareArguments(Collections.singletonMap("alternateExchange", "notKnown")).declare()
.consumeResponse().getLatestResponse(ConnectionCloseBody.class);
- // TODO server fails - jira required
assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-10] Add
queue protocol tests
Posted by kw...@apache.org.
QPID-8038: [Broker-J] [AMQP 0-10] Add queue protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7b4e3e8d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7b4e3e8d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7b4e3e8d
Branch: refs/heads/master
Commit: 7b4e3e8d59d263757f58664b775dcf55a927e360
Parents: 7640cfb
Author: Keith Wall <kw...@apache.org>
Authored: Mon Dec 11 18:12:08 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Dec 11 22:36:08 2017 +0000
----------------------------------------------------------------------
.../qpid/tests/protocol/v0_10/Interaction.java | 7 +
.../protocol/v0_10/MessageInteraction.java | 23 +-
.../tests/protocol/v0_10/QueueInteraction.java | 109 ++++
.../qpid/tests/protocol/v0_10/MessageTest.java | 2 +-
.../qpid/tests/protocol/v0_10/QueueTest.java | 598 +++++++++++++++++++
.../tests/protocol/v0_10/TransactionTest.java | 2 +-
6 files changed, 738 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7b4e3e8d/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
index d636290..0679386 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -38,6 +38,7 @@ public class Interaction extends AbstractInteraction<Interaction>
private SessionInteraction _sessionInteraction;
private MessageInteraction _messageInteraction;
private ExecutionInteraction _executionInteraction;
+ private QueueInteraction _queueInteraction;
private int _channelId;
private TxInteraction _txInteraction;
@@ -49,6 +50,7 @@ public class Interaction extends AbstractInteraction<Interaction>
_messageInteraction = new MessageInteraction(this);
_executionInteraction = new ExecutionInteraction(this);
_txInteraction = new TxInteraction(this);
+ _queueInteraction = new QueueInteraction(this);
}
@Override
@@ -136,4 +138,9 @@ public class Interaction extends AbstractInteraction<Interaction>
{
return _txInteraction;
}
+
+ public QueueInteraction queue()
+ {
+ return _queueInteraction;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7b4e3e8d/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
index 4660c86..4645d46 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -23,6 +23,7 @@ package org.apache.qpid.tests.protocol.v0_10;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCancel;
import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
import org.apache.qpid.server.protocol.v0_10.transport.MessageFlow;
import org.apache.qpid.server.protocol.v0_10.transport.MessageSubscribe;
@@ -34,6 +35,7 @@ public class MessageInteraction
private final Interaction _interaction;
private MessageTransfer _transfer;
private MessageSubscribe _subscribe;
+ private MessageCancel _cancel;
private MessageFlow _flow;
private MessageAccept _accept;
@@ -42,6 +44,7 @@ public class MessageInteraction
_interaction = interaction;
_transfer = new MessageTransfer();
_subscribe = new MessageSubscribe();
+ _cancel = new MessageCancel();
_flow = new MessageFlow();
_accept = new MessageAccept();
}
@@ -52,7 +55,7 @@ public class MessageInteraction
return this;
}
- public MessageInteraction transferDesitnation(final String destination)
+ public MessageInteraction transferDestination(final String destination)
{
_transfer.setDestination(destination);
return this;
@@ -81,6 +84,24 @@ public class MessageInteraction
return _interaction.sendPerformative(_subscribe);
}
+ public MessageInteraction cancelId(final int id)
+ {
+ _cancel.setId(id);
+ return this;
+ }
+
+ public MessageInteraction cancelDestination(final String destination)
+ {
+ _cancel.setDestination(destination);
+ return this;
+ }
+
+ public Interaction cancel() throws Exception
+ {
+ _interaction.sendPerformative(_cancel);
+ return _interaction;
+ }
+
public MessageInteraction subscribeDestination(final String destination)
{
_subscribe.setDestination(destination);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7b4e3e8d/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
new file mode 100644
index 0000000..7e15099
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/QueueInteraction.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.QueueDeclare;
+import org.apache.qpid.server.protocol.v0_10.transport.QueueDelete;
+import org.apache.qpid.server.protocol.v0_10.transport.TxCommit;
+import org.apache.qpid.server.protocol.v0_10.transport.TxSelect;
+
+public class QueueInteraction
+{
+ private final Interaction _interaction;
+ private final QueueDeclare _declare;
+ private final QueueDelete _delete;
+
+ public QueueInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _declare = new QueueDeclare();
+ _delete = new QueueDelete();
+ }
+
+ public QueueInteraction declareQueue(final String queue)
+ {
+ _declare.setQueue(queue);
+ return this;
+ }
+
+ public QueueInteraction declareId(final int id)
+ {
+ _declare.setId(id);
+ return this;
+ }
+ public QueueInteraction declarePassive(final boolean passive)
+ {
+ _declare.setPassive(passive);
+ return this;
+ }
+
+ public QueueInteraction declareDurable(final boolean durable)
+ {
+ _declare.setDurable(durable);
+ return this;
+ }
+
+ public QueueInteraction declareAlternateExchange(final String alternateExchange)
+ {
+ _declare.setAlternateExchange(alternateExchange);
+ return this;
+ }
+
+ public QueueInteraction declareExclusive(final boolean exclusive)
+ {
+ _declare.setExclusive(exclusive);
+ return this;
+ }
+
+ public QueueInteraction declareAutoDelete(final boolean autoDelete)
+ {
+ _declare.setAutoDelete(autoDelete);
+ return this;
+ }
+
+ public Interaction declare() throws Exception
+ {
+ return _interaction.sendPerformative(_declare);
+ }
+
+ public QueueInteraction deleteQueue(final String queueName)
+ {
+ _delete.setQueue(queueName);
+ return this;
+ }
+
+ public QueueInteraction deleteId(final int id)
+ {
+ _delete.setId(id);
+ return this;
+ }
+
+ public QueueInteraction deleteIfUnused(final boolean ifUnused)
+ {
+ _delete.ifUnused(ifUnused);
+ return this;
+ }
+
+ public Interaction delete() throws Exception
+ {
+ return _interaction.sendPerformative(_delete);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7b4e3e8d/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
index 9a477dc..4088b9b 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -75,7 +75,7 @@ public class MessageTest extends BrokerAdminUsingTestBase
.channelId(1)
.attachSession(sessionName)
.message()
- .transferDesitnation(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
.transferId(0)
.transfer()
.session()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7b4e3e8d/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java
new file mode 100644
index 0000000..b6a6fcf
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/QueueTest.java
@@ -0,0 +1,598 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionErrorCode;
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionException;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private static final byte[] SESSION_NAME = "test".getBytes(UTF_8);
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare", description = "This command creates or checks a queue.")
+ public void queueDeclare() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ SessionCompleted completed = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(SessionCompleted.class);
+
+ assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "The alternate-exchange field specifies how messages on this queue should be treated when "
+ + "they are rejected by a subscriber, or when they are orphaned by queue deletion.")
+ public void queueDeclareWithAlternateExchange() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareAlternateExchange("amq.direct")
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "if the alternate-exchange does not match the name of any existing exchange on the server, "
+ + "then an exception must be raised.")
+ public void queueDeclareAlternateExchangeNotFound() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ExecutionException response = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareAlternateExchange("unknownExchange")
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ // TODO Specification says ExecutionErrorCode.NOT_FOUND must be used.
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.ILLEGAL_ARGUMENT)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "The client MAY ask the server to assert that a queue exists without creating the queue if "
+ + "not.")
+ public void queueDeclarePassive() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declarePassive(true)
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(SessionCompleted.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "[...] If the queue does not exist, the server treats this as a failure.")
+ public void queueDeclarePassiveQueueNotFound() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ExecutionException response = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declarePassive(true)
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "If set when creating a new queue, the queue will be marked as durable. Durable queues "
+ + "remain active when a server restarts.")
+ public void queueDeclareDurable() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareDurable(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+ }
+
+ assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
+ getBrokerAdmin().restart();
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declarePassive(true)
+ .declareId(0)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(SessionCompleted.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "If the server receives a declare, bind, consume or get request for a queue that has been"
+ + "declared as exclusive by an existing client session, it MUST raise an exception.")
+ public void queueDeclareAttemptedConsumeOfExclusivelyDeclaredQueue() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareExclusive(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction2 = transport2.newInteraction();
+ ExecutionException response = interaction2.openAnonymousConnection()
+ .channelId(1)
+ .attachSession("test2".getBytes(UTF_8))
+ .message()
+ .subscribeDestination("mysub")
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.RESOURCE_LOCKED)));
+ }
+ }
+
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction2 = transport2.newInteraction();
+ interaction2.openAnonymousConnection()
+ .channelId(1)
+ .attachSession("test2".getBytes(UTF_8))
+ .message()
+ .subscribeDestination("mysub")
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "If the server receives a declare, bind, consume or get request for a queue that has been"
+ + "declared as exclusive by an existing client session, it MUST raise an exception.")
+ public void queueDeclareRedeclareOfExclusivelyDeclaredQueue() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareExclusive(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction2 = transport2.newInteraction();
+ ExecutionException response = interaction2.openAnonymousConnection()
+ .channelId(1)
+ .attachSession("test2".getBytes(UTF_8))
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareExclusive(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.RESOURCE_LOCKED)));
+ }
+ }
+
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction2 = transport2.newInteraction();
+ interaction2.openAnonymousConnection()
+ .channelId(1)
+ .attachSession("test2".getBytes(UTF_8))
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareExclusive(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "If this field [auto-delete] is set and the exclusive field is also set, then the queue "
+ + "MUST be deleted when the session closes.")
+ public void queueDeclareAutoDeleteAndExclusiveDeletedBySessionDetach() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareExclusive(true)
+ .declareAutoDelete(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class)
+ .session()
+ .detachName(SESSION_NAME)
+ .detach()
+ .consumeResponse(SessionDetached.class);
+
+ ExecutionException response = interaction.channelId(2)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declarePassive(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.declare",
+ description = "If this field is set and the exclusive field is not set the queue is deleted when all the "
+ + "consumers have finished using it. Last consumer can be cancelled either explicitly or "
+ + "because its session is closed.")
+ public void queueDeclareAutoDeleteDeletedByLastConsumerCancelled() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declareAutoDelete(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+ }
+
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction2 = transport2.newInteraction();
+ String subscriberName = "mysub";
+ interaction2.openAnonymousConnection()
+ .channelId(1)
+ .attachSession("test2".getBytes(UTF_8))
+ .queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(0)
+ .declarePassive(true)
+ .declare()
+ .message()
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(1)
+ .subscribe()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class)
+ .message()
+ .cancelId(2)
+ .cancelDestination(subscriberName)
+ .cancel()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+
+ ExecutionException response = interaction2.queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declareId(3)
+ .declarePassive(true)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.delete", description = "This command deletes a queue.")
+ public void queueDelete() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .deleteQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .deleteId(0)
+ .delete()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+
+ ExecutionException response = interaction.queue()
+ .declareQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .declarePassive(true)
+ .declareId(1)
+ .declare()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.delete",
+ description = "The queue must exist. If the client attempts to delete a non-existing queue the server "
+ + "MUST raise an exception.")
+ public void queueDeleteQueueNotFound() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ExecutionException response = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .queue()
+ .deleteQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .deleteId(0)
+ .delete()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.queue.delete",
+ description = "If set, the server will only delete the queue if it has no consumers. If the queue has "
+ + "consumers the server does does not delete it but raises an exception instead.")
+ public void queueDeleteQueueDeleteWithConsumer() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ try (FrameTransport consumerTransport = new FrameTransport(_brokerAddress).connect();
+ FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction consumerInteraction = consumerTransport.newInteraction();
+ String subscriberName = "mysub";
+ consumerInteraction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(SESSION_NAME)
+ .message()
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(1)
+ .subscribe()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+
+ final Interaction interaction = transport.newInteraction();
+ ExecutionException response = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession("test2".getBytes(UTF_8))
+ .queue()
+ .deleteQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .deleteId(0)
+ .deleteIfUnused(true)
+ .delete()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCommandPoint.class)
+ .consumeResponse()
+ .getLatestResponse(ExecutionException.class);
+
+ assertThat(response.getErrorCode(), is(equalTo(ExecutionErrorCode.PRECONDITION_FAILED)));
+
+ consumerInteraction.message()
+ .cancelId(2)
+ .cancelDestination(subscriberName)
+ .cancel()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+
+ consumerInteraction.queue()
+ .deleteQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .deleteId(0)
+ .deleteIfUnused(true)
+ .delete()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse(SessionCompleted.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7b4e3e8d/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
index 337ada3..443aede 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -60,7 +60,7 @@ public class TransactionTest extends BrokerAdminUsingTestBase
.attachSession(sessionName)
.tx().selectId(0).select()
.message()
- .transferDesitnation(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferDestination(BrokerAdmin.TEST_QUEUE_NAME)
.transferId(1)
.transfer()
.session()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org