You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/22 15:02:47 UTC
[1/2] qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 0-10] Add
protocol tests for AMQP 0-10
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 612c2cb6c -> ff2980e2d
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
new file mode 100644
index 0000000..9a477dc
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/MessageTest.java
@@ -0,0 +1,260 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.Range;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionConfirmed;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class MessageTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ @SpecificationTest(section = "10.message.transfer",
+ description = "This command transfers a message between two peers.")
+ public void sendTransfer() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(UTF_8);
+ SessionCompleted completed = interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .message()
+ .transferDesitnation(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferId(0)
+ .transfer()
+ .session()
+ .flushCompleted()
+ .flush()
+ .consumeResponse()
+ .getLatestResponse(SessionCompleted.class);
+
+ assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+ int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(queueDepthMessages, is(equalTo(1)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.message.subscribe",
+ description = "This command asks the server to start a \"subscription\","
+ + " which is a request for messages from a specific queue.")
+ public void subscribe() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "testSession".getBytes(UTF_8);
+ final String subscriberName = "testSubscriber";
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .message()
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ SessionCompleted completed = consumeResponse(interaction,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
+
+ assertThat(completed.getCommands(), is(notNullValue()));
+ assertThat(completed.getCommands().includes(0), is(equalTo(true)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.message.transfer",
+ description = "The client may request a broker to transfer messages to it, from a particular queue,"
+ + " by issuing a subscribe command. The subscribe command specifies the destination"
+ + " that the broker should use for any resulting transfers.")
+ public void receiveTransfer() throws Exception
+ {
+ String testMessageBody = "testMessage";
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "testSession".getBytes(UTF_8);
+ final String subscriberName = "testSubscriber";
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .message()
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .message()
+ .flowId(1)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.MESSAGE)
+ .flowValue(1)
+ .flow()
+ .message()
+ .flowId(2)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.BYTE)
+ .flowValue(-1)
+ .flow();
+
+ MessageTransfer transfer = consumeResponse(interaction,
+ MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
+
+ try (QpidByteBuffer buffer = transfer.getBody())
+ {
+ final byte[] dst = new byte[buffer.remaining()];
+ buffer.get(dst);
+ assertThat(new String(dst, UTF_8), is(equalTo(testMessageBody)));
+ }
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "10.message.accept",
+ description = "Accepts the message.")
+ public void acceptTransfer() throws Exception
+ {
+ String testMessageBody = "testMessage";
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, testMessageBody);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "testSession".getBytes(UTF_8);
+ final String subscriberName = "testSubscriber";
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .message()
+ .subscribeAcceptMode(MessageAcceptMode.EXPLICIT)
+ .subscribeAcquireMode(MessageAcquireMode.PRE_ACQUIRED)
+ .subscribeDestination(subscriberName)
+ .subscribeQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .subscribeId(0)
+ .subscribe()
+ .message()
+ .flowId(1)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.MESSAGE)
+ .flowValue(1)
+ .flow()
+ .message()
+ .flowId(2)
+ .flowDestination(subscriberName)
+ .flowUnit(MessageCreditUnit.BYTE)
+ .flowValue(-1)
+ .flow();
+
+ MessageTransfer transfer = consumeResponse(interaction,
+ MessageTransfer.class,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+ RangeSet transfers = Range.newInstance(transfer.getId());
+ interaction.message().acceptId(3).acceptTransfers(transfers).accept()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ SessionCompleted completed = consumeResponse(interaction,
+ SessionCompleted.class,
+ SessionCommandPoint.class,
+ SessionConfirmed.class,
+ SessionFlush.class);
+
+ assertThat(completed.getCommands(), is(notNullValue()));
+ assertThat(completed.getCommands().includes(3), is(equalTo(true)));
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ private <T extends Method> T consumeResponse(final Interaction interaction,
+ final Class<T> expected,
+ final Class<? extends Method>... ignore)
+ throws Exception
+ {
+ List<Class<? extends Method>> possibleResponses = new ArrayList<>(Arrays.asList(ignore));
+ possibleResponses.add(expected);
+
+ T completed = null;
+ do
+ {
+ interaction.consumeResponse(possibleResponses.toArray(new Class[possibleResponses.size()]));
+ Response<?> response = interaction.getLatestResponse();
+ if (expected.isAssignableFrom(response.getBody().getClass()))
+ {
+ completed = (T) response.getBody();
+ }
+ }
+ while (completed == null);
+ return completed;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
new file mode 100644
index 0000000..51de2c2
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/SessionTest.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetached;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class SessionTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "9.session.attach",
+ description = "Requests that the current transport be attached to the named session.")
+ public void attach() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ SessionAttached sessionAttached = interaction.openAnonymousConnection()
+ .channelId(1)
+ .session()
+ .attachName(sessionName)
+ .attach()
+ .consumeResponse()
+ .getLatestResponse(SessionAttached.class);
+ assertThat(sessionAttached.getName(), IsEqual.equalTo(sessionName));
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "9.session.detach",
+ description = "Detaches the current transport from the named session.")
+ public void detach() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ SessionDetached sessionDetached = interaction.openAnonymousConnection()
+ .channelId(1)
+ .session()
+ .attachName(sessionName)
+ .attach()
+ .consumeResponse(SessionAttached.class)
+ .session()
+ .detachName(sessionName)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(SessionDetached.class);
+
+ assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+ }
+ }
+
+
+ @Ignore("QPID-8047")
+ @Test
+ @SpecificationTest(section = "9.session",
+ description = "The transport MUST be attached in order to use any control other than"
+ + " \"attach\", \"attached\", \"detach\", or \"detached\"."
+ + " A peer receiving any other control on a detached transport MUST discard it and send a"
+ + " session.detached with the \"not-attached\" reason code.")
+ public void detachUnknownSession() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(StandardCharsets.UTF_8);
+ SessionDetached sessionDetached = interaction.openAnonymousConnection()
+ .channelId(1)
+ .session()
+ .detachName(sessionName)
+ .detach()
+ .consumeResponse()
+ .getLatestResponse(SessionDetached.class);
+
+ assertThat(sessionDetached.getName(), IsEqual.equalTo(sessionName));
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
new file mode 100644
index 0000000..337ada3
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/TransactionTest.java
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCompleted;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class TransactionTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ @SpecificationTest(section = "10.tx.commit",
+ description = "This command commits all messages published and accepted in the current transaction.")
+ public void messageSendCommit() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ byte[] sessionName = "test".getBytes(UTF_8);
+ interaction.openAnonymousConnection()
+ .channelId(1)
+ .attachSession(sessionName)
+ .tx().selectId(0).select()
+ .message()
+ .transferDesitnation(BrokerAdmin.TEST_QUEUE_NAME)
+ .transferId(1)
+ .transfer()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ SessionCompleted completed;
+ do
+ {
+ completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+ }
+ while (!completed.getCommands().includes(1));
+
+ int queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(queueDepthMessages, is(equalTo(0)));
+
+ interaction.tx().commitId(2).commit()
+ .session()
+ .flushCompleted()
+ .flush();
+
+ completed = interaction.consumeResponse().getLatestResponse(SessionCompleted.class);
+ assertThat(completed.getCommands().includes(2), is(equalTo(true)));
+
+ queueDepthMessages = getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(queueDepthMessages, is(equalTo(1)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
new file mode 100644
index 0000000..f4ed0a2
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareBody;
+
+public class ExchangeInteraction
+{
+ private Interaction _interaction;
+ private String _decalreExchange = "amq.direct";
+ private String _declareType = "direct";
+ private boolean _declarePassive = true;
+ private boolean _declareDurable = true;
+ private boolean _declareAutoDelete = false;
+ private boolean _declareNoWait = false;
+ private Map<String, Object> _declareArguments = new HashMap<>();
+
+ public ExchangeInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ }
+
+ public Interaction declare() throws Exception
+ {
+ return _interaction.sendPerformative(new ExchangeDeclareBody(0,
+ AMQShortString.valueOf(_decalreExchange),
+ AMQShortString.valueOf(_declareType),
+ _declarePassive,
+ _declareDurable,
+ _declareAutoDelete,
+ false,
+ _declareNoWait,
+ FieldTable.convertToFieldTable(_declareArguments)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
index 52cd7a0..0b5c4e4 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -26,18 +26,20 @@ import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
-public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
+public class FrameTransport extends AbstractFrameTransport<Interaction>
{
private final byte[] _protocolHeader;
private ProtocolVersion _protocolVersion;
- public FrameTransport(final InetSocketAddress brokerAddress)
+ FrameTransport(final InetSocketAddress brokerAddress)
{
this(brokerAddress, Protocol.AMQP_0_9_1);
}
- public FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
+
+ FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
{
super(brokerAddress, new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder());
_protocolVersion = getProtocolVersion(protocol);
@@ -74,12 +76,12 @@ public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTranspor
return _protocolHeader;
}
- public ProtocolVersion getProtocolVersion()
+ ProtocolVersion getProtocolVersion()
{
return _protocolVersion;
}
- public static ProtocolVersion getProtocolVersion(Protocol protocol)
+ private static ProtocolVersion getProtocolVersion(Protocol protocol)
{
final ProtocolVersion protocolVersion;
switch (protocol)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 35f4bf5..b990eae 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -26,8 +26,9 @@ import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
-public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
+public class Interaction extends AbstractInteraction<Interaction>
{
private int _channelId;
@@ -36,6 +37,8 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
private ChannelInteraction _channelInteraction;
private QueueInteraction _queueInteraction;
private BasicInteraction _basicInteraction;
+ private TxInteraction _txInteraction;
+ private ExchangeInteraction _exchangeInteraction;
Interaction(final FrameTransport transport)
{
@@ -44,6 +47,8 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
_channelInteraction = new ChannelInteraction(this);
_queueInteraction = new QueueInteraction(this);
_basicInteraction = new BasicInteraction(this);
+ _txInteraction = new TxInteraction(this);
+ _exchangeInteraction = new ExchangeInteraction(this);
}
@Override
@@ -52,12 +57,6 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
return getTransport().getProtocolHeader();
}
- @Override
- protected Interaction getInteraction()
- {
- return this;
- }
-
public Interaction sendPerformative(final AMQBody amqBody) throws Exception
{
return sendPerformative(_channelId, amqBody);
@@ -120,4 +119,14 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
{
return _basicInteraction;
}
+
+ public TxInteraction tx()
+ {
+ return _txInteraction;
+ }
+
+ public ExchangeInteraction exchange()
+ {
+ return _exchangeInteraction;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java
new file mode 100644
index 0000000..63a078a
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/TxInteraction.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.transport.TxCommitBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectBody;
+
+public class TxInteraction
+{
+ private Interaction _interaction;
+
+ public TxInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ }
+
+ public Interaction select() throws Exception
+ {
+ return _interaction.sendPerformative(TxSelectBody.INSTANCE);
+ }
+
+ public Interaction commit() throws Exception
+ {
+ return _interaction.sendPerformative(TxCommitBody.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
index b5cf60d..40d1918 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
@@ -58,7 +58,7 @@ public class ChannelTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "1.5.2.5", description = "request a channel close")
- public void channelClose() throws Exception
+ public void noFrameCanBeSentOnClosedChannel() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
index b4909c6..fa4a692 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -31,8 +31,10 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -119,7 +121,8 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
.tuneOkFrameMax(1024)
.tuneOkHeartbeat(response.getHeartbeat())
.tuneOk()
- .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ .connection().open()
+ .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
}
}
@@ -144,7 +147,8 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
.tuneOkFrameMax(Long.MAX_VALUE)
.tuneOkHeartbeat(response.getHeartbeat())
.tuneOk()
- .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ .connection().open()
+ .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
}
}
@@ -180,4 +184,22 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
}
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassAfterSendingStartOk() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("PLAIN").startOk().consumeResponse(ConnectionSecureBody.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
new file mode 100644
index 0000000..df677e8
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/TransactionTest.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxCommitOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class TransactionTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ @SpecificationTest(section = "1.9.2.3", description = "commit the current transaction")
+ public void publishMessage() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select().consumeResponse(TxSelectOkBody.class)
+ .basic().contentHeaderPropertiesContentType("text/plain")
+ .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+ .contentHeaderPropertiesDeliveryMode((byte)1)
+ .contentHeaderPropertiesPriority((byte)1)
+ .publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .content("Test")
+ .publishMessage()
+ .exchange().declare().consumeResponse(ExchangeDeclareOkBody.class);
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+
+ interaction.tx().commit().consumeResponse(TxCommitOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index dd59757..219c423 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -23,7 +23,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.InetSocketAddress;
-public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+
+public class FrameTransport extends AbstractFrameTransport<Interaction>
{
public FrameTransport(final InetSocketAddress brokerAddress)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 4aad6ee..b2f8147 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -77,9 +77,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
import org.apache.qpid.tests.protocol.Response;
-public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
+public class Interaction extends AbstractInteraction<Interaction>
{
private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Begin _begin;
@@ -177,12 +178,6 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
return _protocolHeader;
}
- @Override
- protected Interaction getInteraction()
- {
- return this;
- }
-
//////////
// SASL //
//////////
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
new file mode 100644
index 0000000..cad8415
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractFrameTransport.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class AbstractFrameTransport<I extends AbstractInteraction<I>> implements AutoCloseable
+{
+ static final long RESPONSE_TIMEOUT =
+ Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+ private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+ private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+ private final EventLoopGroup _workerGroup;
+ private final InetSocketAddress _brokerAddress;
+ private final InputHandler _inputHandler;
+ private final OutputHandler _outputHandler;
+
+ private volatile Channel _channel;
+ private volatile boolean _channelClosedSeen = false;
+
+ public AbstractFrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+ {
+ _brokerAddress = brokerAddress;
+ _inputHandler = new InputHandler(_queue, inputDecoder);
+ _outputHandler = new OutputHandler(outputEncoder);
+ _workerGroup = new NioEventLoopGroup();
+ }
+
+ public InetSocketAddress getBrokerAddress()
+ {
+ return _brokerAddress;
+ }
+
+ public AbstractFrameTransport<I> connect()
+ {
+ try
+ {
+ Bootstrap b = new Bootstrap();
+ b.group(_workerGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.handler(new ChannelInitializer<SocketChannel>()
+ {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception
+ {
+ ChannelPipeline pipeline = ch.pipeline();
+ buildInputOutputPipeline(pipeline);
+ }
+ });
+
+ _channel = b.connect(_brokerAddress).sync().channel();
+ _channel.closeFuture().addListener(future ->
+ {
+ _channelClosedSeen = true;
+ _queue.add(CHANNEL_CLOSED_RESPONSE);
+ });
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+ {
+ pipeline.addLast(_inputHandler).addLast(_outputHandler);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.disconnect().sync();
+ _channel.close().sync();
+ _channel = null;
+ }
+ }
+ finally
+ {
+ _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+ }
+ }
+
+ ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(bytes);
+ _channel.write(buffer, promise);
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+
+ public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
+ _channel.write(data, promise);
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+
+ <T extends Response<?>> T getNextResponse() throws Exception
+ {
+ return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public void assertNoMoreResponses() throws Exception
+ {
+ Response response = getNextResponse();
+ assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+ }
+
+ public void assertNoMoreResponsesAndChannelClosed() throws Exception
+ {
+ assertNoMoreResponses();
+ assertThat(_channelClosedSeen, is(true));
+ }
+
+ public void flush()
+ {
+ _channel.flush();
+ }
+
+ public abstract byte[] getProtocolHeader();
+
+ public abstract I newInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
new file mode 100644
index 0000000..4b41ca9
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/AbstractInteraction.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class AbstractInteraction<I extends AbstractInteraction<I>>
+{
+ private final AbstractFrameTransport<I> _transport;
+ private ListenableFuture<?> _latestFuture;
+ private Response<?> _latestResponse;
+
+ public AbstractInteraction(final AbstractFrameTransport<I> frameTransport)
+ {
+ _transport = frameTransport;
+ }
+
+ public I consumeResponse(final Class<?>... responseTypes) throws Exception
+ {
+ sync();
+ _latestResponse = getNextResponse();
+ final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+ if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+ || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ {
+ return getInteraction();
+ }
+ acceptableResponseClasses.remove(null);
+ if (_latestResponse != null)
+ {
+ for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ {
+ if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ return getInteraction();
+ }
+ }
+ }
+ throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+ acceptableResponseClasses,
+ _latestResponse == null ? null : _latestResponse.getBody()));
+ }
+
+ protected Response<?> getNextResponse() throws Exception
+ {
+ return _transport.getNextResponse();
+ }
+
+ public I sync() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ _transport.flush();
+ if (_latestFuture != null)
+ {
+ _latestFuture.get(AbstractFrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ _latestFuture = null;
+ }
+ return getInteraction();
+ }
+
+ public Response<?> getLatestResponse() throws Exception
+ {
+ return _latestResponse;
+ }
+
+ public <T> T getLatestResponse(Class<T> type) throws Exception
+ {
+ if (_latestResponse.getBody() == null)
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getClass()));
+ }
+
+ if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getBody()));
+ }
+
+ return (T) _latestResponse.getBody();
+ }
+
+ protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return future;
+ }
+
+ public I negotiateProtocol() throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return getInteraction();
+ }
+
+ protected AbstractFrameTransport getTransport()
+ {
+ return _transport;
+ }
+
+ protected abstract byte[] getProtocolHeader();
+
+ private I getInteraction()
+ {
+ return (I) this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java
new file mode 100644
index 0000000..b701023
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/ChannelClosedResponse.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+public class ChannelClosedResponse implements Response<ChannelClosedResponse>
+{
+ @Override
+ public String toString()
+ {
+ return "ChannelClosed";
+ }
+
+ @Override
+ public ChannelClosedResponse getBody()
+ {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
deleted file mode 100644
index 28dc02e..0000000
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-public abstract class FrameTransport implements AutoCloseable
-{
- public static final long RESPONSE_TIMEOUT =
- Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
- private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
- private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
- private final EventLoopGroup _workerGroup;
- private final InetSocketAddress _brokerAddress;
- private final InputHandler _inputHandler;
- private final OutputHandler _outputHandler;
-
- private volatile Channel _channel;
- private volatile boolean _channelClosedSeen = false;
-
- public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
- {
- _brokerAddress = brokerAddress;
- _inputHandler = new InputHandler(_queue, inputDecoder);
- _outputHandler = new OutputHandler(outputEncoder);
- _workerGroup = new NioEventLoopGroup();
- }
-
- public InetSocketAddress getBrokerAddress()
- {
- return _brokerAddress;
- }
-
- public FrameTransport connect()
- {
- try
- {
- Bootstrap b = new Bootstrap();
- b.group(_workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>()
- {
- @Override
- public void initChannel(SocketChannel ch) throws Exception
- {
- ChannelPipeline pipeline = ch.pipeline();
- buildInputOutputPipeline(pipeline);
- }
- });
-
- _channel = b.connect(_brokerAddress).sync().channel();
- _channel.closeFuture().addListener(future ->
- {
- _channelClosedSeen = true;
- _queue.add(CHANNEL_CLOSED_RESPONSE);
- });
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- return this;
- }
-
- protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
- {
- pipeline.addLast(_inputHandler).addLast(_outputHandler);
- }
-
- @Override
- public void close() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.disconnect().sync();
- _channel.close().sync();
- _channel = null;
- }
- }
- finally
- {
- _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
- }
- }
-
- public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
- {
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(bytes);
- _channel.write(buffer, promise);
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
-
- public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
- {
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- _channel.write(data, promise);
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
-
- public <T extends Response<?>> T getNextResponse() throws Exception
- {
- return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- }
-
- public void assertNoMoreResponses() throws Exception
- {
- Response response = getNextResponse();
- assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
- }
-
- public void assertNoMoreResponsesAndChannelClosed() throws Exception
- {
- assertNoMoreResponses();
- assertThat(_channelClosedSeen, is(true));
- }
-
- public void flush()
- {
- _channel.flush();
- }
-
- private static class ChannelClosedResponse implements Response<Void>
- {
- @Override
- public String toString()
- {
- return "ChannelClosed";
- }
-
- @Override
- public Void getBody()
- {
- return null;
- }
- }
-
- public abstract byte[] getProtocolHeader();
-
- protected abstract Interaction newInteraction();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
deleted file mode 100644
index b6e631d..0000000
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.tests.protocol;
-
-import static com.google.common.util.concurrent.Futures.allAsList;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-public abstract class Interaction<I extends Interaction>
-{
- private final FrameTransport _transport;
- private ListenableFuture<?> _latestFuture;
- private Response<?> _latestResponse;
-
- public Interaction(final FrameTransport frameTransport)
- {
- _transport = frameTransport;
- }
-
- public I consumeResponse(final Class<?>... responseTypes) throws Exception
- {
- sync();
- _latestResponse = getNextResponse();
- final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
- if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
- || (acceptableResponseClasses.contains(null) && _latestResponse == null))
- {
- return getInteraction();
- }
- acceptableResponseClasses.remove(null);
- if (_latestResponse != null)
- {
- for (Class<?> acceptableResponseClass : acceptableResponseClasses)
- {
- if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- return getInteraction();
- }
- }
- }
- throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
- acceptableResponseClasses,
- _latestResponse == null ? null : _latestResponse.getBody()));
- }
-
- protected Response<?> getNextResponse() throws Exception
- {
- return _transport.getNextResponse();
- }
-
- public I sync() throws InterruptedException, ExecutionException, TimeoutException
- {
- _transport.flush();
- if (_latestFuture != null)
- {
- _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- _latestFuture = null;
- }
- return getInteraction();
- }
-
- public Response<?> getLatestResponse() throws Exception
- {
- return _latestResponse;
- }
-
- public <T> T getLatestResponse(Class<T> type) throws Exception
- {
- if (_latestResponse.getBody() == null)
- {
- throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
- type.getSimpleName(),
- _latestResponse.getClass()));
- }
-
- if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
- type.getSimpleName(),
- _latestResponse.getBody()));
- }
-
- return (T) _latestResponse.getBody();
- }
-
- protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
- {
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
- return future;
- }
-
- public I negotiateProtocol() throws Exception
- {
- final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
- return getInteraction();
- }
-
- protected FrameTransport getTransport()
- {
- return _transport;
- }
-
- protected abstract byte[] getProtocolHeader();
-
- protected abstract I getInteraction();
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 0-10] Add
protocol tests for AMQP 0-10
Posted by or...@apache.org.
QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ff2980e2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ff2980e2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ff2980e2
Branch: refs/heads/master
Commit: ff2980e2d6e9520ba204acd41a78e9ee412a2c11
Parents: 612c2cb
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Nov 21 17:16:42 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 22 15:02:23 2017 +0000
----------------------------------------------------------------------
pom.xml | 47 ++--
systests/protocol-tests-amqp-0-10/pom.xml | 109 +++++++
.../qpid/tests/protocol/v0_10/Assembler.java | 264 +++++++++++++++++
.../protocol/v0_10/ConnectionInteraction.java | 83 ++++++
.../qpid/tests/protocol/v0_10/Disassembler.java | 281 +++++++++++++++++++
.../tests/protocol/v0_10/ErrorResponse.java | 40 +++
.../protocol/v0_10/ExecutionInteraction.java | 47 ++++
.../qpid/tests/protocol/v0_10/FrameDecoder.java | 190 +++++++++++++
.../qpid/tests/protocol/v0_10/FrameEncoder.java | 87 ++++++
.../tests/protocol/v0_10/FrameTransport.java | 58 ++++
.../qpid/tests/protocol/v0_10/Interaction.java | 138 +++++++++
.../protocol/v0_10/MessageInteraction.java | 147 ++++++++++
.../protocol/v0_10/PerformativeResponse.java | 48 ++++
.../protocol/v0_10/ProtocolEventReceiver.java | 67 +++++
.../protocol/v0_10/SessionInteraction.java | 89 ++++++
.../tests/protocol/v0_10/TxInteraction.java | 60 ++++
.../resources/config-protocol-tests-0-10.json | 78 +++++
.../tests/protocol/v0_10/ConnectionTest.java | 214 ++++++++++++++
.../qpid/tests/protocol/v0_10/MessageTest.java | 260 +++++++++++++++++
.../qpid/tests/protocol/v0_10/SessionTest.java | 123 ++++++++
.../tests/protocol/v0_10/TransactionTest.java | 92 ++++++
.../protocol/v0_8/ExchangeInteraction.java | 58 ++++
.../tests/protocol/v0_8/FrameTransport.java | 12 +-
.../qpid/tests/protocol/v0_8/Interaction.java | 23 +-
.../qpid/tests/protocol/v0_8/TxInteraction.java | 44 +++
.../qpid/tests/protocol/v0_8/ChannelTest.java | 2 +-
.../tests/protocol/v0_8/ConnectionTest.java | 26 +-
.../tests/protocol/v0_8/TransactionTest.java | 79 ++++++
.../tests/protocol/v1_0/FrameTransport.java | 4 +-
.../qpid/tests/protocol/v1_0/Interaction.java | 9 +-
.../tests/protocol/AbstractFrameTransport.java | 175 ++++++++++++
.../tests/protocol/AbstractInteraction.java | 150 ++++++++++
.../tests/protocol/ChannelClosedResponse.java | 36 +++
.../qpid/tests/protocol/FrameTransport.java | 190 -------------
.../apache/qpid/tests/protocol/Interaction.java | 147 ----------
35 files changed, 3097 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e7d58bb..0c0c445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@
<module>systests/qpid-systests-jms_2.0</module>
<module>systests/protocol-tests-core</module>
<module>systests/protocol-tests-amqp-0-8</module>
+ <module>systests/protocol-tests-amqp-0-10</module>
<module>systests/protocol-tests-amqp-1-0</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
@@ -427,6 +428,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-amqp-0-10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
@@ -1332,26 +1339,26 @@
</properties>
</profile>
- <profile>
- <id>java-json.0-10</id>
- <activation>
- <property>
- <name>profile</name>
- <value>java-json.0-10</value>
- </property>
- </activation>
- <properties>
- <profile>java-json.0-10</profile>
- <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
- <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
- <profile.broker.persistent>true</profile.broker.persistent>
- <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
- <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
- </properties>
- </profile>
-
- <profile>
+ <profile>
+ <id>java-json.0-10</id>
+ <activation>
+ <property>
+ <name>profile</name>
+ <value>java-json.0-10</value>
+ </property>
+ </activation>
+ <properties>
+ <profile>java-json.0-10</profile>
+ <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
+ <profile.broker.version>v0_10</profile.broker.version>
+ <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
+ <profile.broker.persistent>true</profile.broker.persistent>
+ <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
+ <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+ </properties>
+ </profile>
+
+ <profile>
<id>cpp</id>
<activation>
<property>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/pom.xml b/systests/protocol-tests-amqp-0-10/pom.xml
new file mode 100644
index 0000000..3acf129
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.1.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-amqp-0-10</artifactId>
+ <name>Apache Qpid Protocol Tests for AMQP 0-10</name>
+ <description>Tests for AMQP 0-10</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-test-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-logging-logback</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-memory-store</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-derby-store</artifactId>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-bdbstore</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-10.json</qpid.initialConfigurationLocation>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
new file mode 100644
index 0000000..aac39b6
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+
+public class Assembler implements NetworkDelegate
+{
+
+ private static final int ARRAY_SIZE = 0xFF;
+ private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+ private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
+
+ private final ProtocolEventReceiver receiver;
+ private final Map<Integer, List<Frame>> segments;
+ private static final ThreadLocal<BBDecoder> _decoder = ThreadLocal.withInitial(BBDecoder::new);
+
+ Assembler(ProtocolEventReceiver receiver)
+ {
+ this.receiver = receiver;
+ segments = new HashMap<>();
+ }
+
+ private int segmentKey(Frame frame)
+ {
+ return (frame.getTrack() + 1) * frame.getChannel();
+ }
+
+ private List<Frame> getSegment(Frame frame)
+ {
+ return segments.get(segmentKey(frame));
+ }
+
+ private void setSegment(Frame frame, List<Frame> segment)
+ {
+ int key = segmentKey(frame);
+ if (segments.containsKey(key))
+ {
+ error(new ProtocolError(Frame.L2, "segment in progress: %s",
+ frame));
+ }
+ segments.put(segmentKey(frame), segment);
+ }
+
+ private void clearSegment(Frame frame)
+ {
+ segments.remove(segmentKey(frame));
+ }
+
+ private void emit(int channel, ProtocolEvent event)
+ {
+ event.setChannel(channel);
+ receiver.received(event);
+ }
+
+ void received(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ emit(0, header);
+ }
+
+ public void error(ProtocolError error)
+ {
+ emit(0, error);
+ }
+
+ public void frame(Frame frame)
+ {
+ ByteBuffer segment;
+ if (frame.isFirstFrame() && frame.isLastFrame())
+ {
+ segment = frame.getBody();
+ assemble(frame, segment);
+ }
+ else
+ {
+ List<Frame> frames;
+ if (frame.isFirstFrame())
+ {
+ frames = new ArrayList<>();
+ setSegment(frame, frames);
+ }
+ else
+ {
+ frames = getSegment(frame);
+ }
+
+ frames.add(frame);
+
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+
+ int size = 0;
+ for (Frame f : frames)
+ {
+ size += f.getSize();
+ }
+ segment = allocateByteBuffer(size);
+ for (Frame f : frames)
+ {
+ segment.put(f.getBody());
+ }
+ segment.flip();
+ assemble(frame, segment);
+ }
+ }
+ }
+
+ private ByteBuffer allocateByteBuffer(final int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+
+ private void assemble(Frame frame, ByteBuffer segment)
+ {
+ BBDecoder dec = _decoder.get();
+ dec.init(segment);
+
+ int channel = frame.getChannel();
+ Method command;
+
+ switch (frame.getType())
+ {
+ case CONTROL:
+ int controlType = dec.readUint16();
+ Method control = Method.create(controlType);
+ control.read(dec);
+ emit(channel, control);
+ break;
+ case COMMAND:
+ int commandType = dec.readUint16();
+ // read in the session header, right now we don't use it
+ int hdr = dec.readUint16();
+ command = Method.create(commandType);
+ command.setSync((0x0001 & hdr) != 0);
+ command.read(dec);
+ if (command.hasPayload() && !frame.isLastSegment())
+ {
+ setIncompleteCommand(channel, command);
+ }
+ else
+ {
+ emit(channel, command);
+ }
+ break;
+ case HEADER:
+ command = getIncompleteCommand(channel);
+ List<Struct> structs = null;
+ DeliveryProperties deliveryProps = null;
+ MessageProperties messageProps = null;
+
+ while (dec.hasRemaining())
+ {
+ Struct struct = dec.readStruct32();
+ if (struct instanceof DeliveryProperties && deliveryProps == null)
+ {
+ deliveryProps = (DeliveryProperties) struct;
+ }
+ else if (struct instanceof MessageProperties && messageProps == null)
+ {
+ messageProps = (MessageProperties) struct;
+ }
+ else
+ {
+ if (structs == null)
+ {
+ structs = new ArrayList<>(2);
+ }
+ structs.add(struct);
+ }
+ }
+ command.setHeader(new Header(deliveryProps, messageProps, structs));
+
+ if (frame.isLastSegment())
+ {
+ setIncompleteCommand(channel, null);
+ emit(channel, command);
+ }
+ break;
+ case BODY:
+ command = getIncompleteCommand(channel);
+ command.setBody(QpidByteBuffer.wrap(segment));
+ setIncompleteCommand(channel, null);
+ emit(channel, command);
+ break;
+ default:
+ throw new IllegalStateException("unknown frame type: " + frame.getType());
+ }
+
+ dec.releaseBuffer();
+ }
+
+ private void setIncompleteCommand(int channelId, Method incomplete)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ _incompleteMethodArray[channelId] = incomplete;
+ }
+ else
+ {
+ if (incomplete != null)
+ {
+ _incompleteMethodMap.put(channelId, incomplete);
+ }
+ else
+ {
+ _incompleteMethodMap.remove(channelId);
+ }
+ }
+ }
+
+ private Method getIncompleteCommand(int channelId)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ return _incompleteMethodArray[channelId];
+ }
+ else
+ {
+ return _incompleteMethodMap.get(channelId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
new file mode 100644
index 0000000..d7b54b0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
+
+public class ConnectionInteraction
+{
+ public static final String SASL_MECHANISM_ANONYMOUS = "ANONYMOUS";
+ public static final String SASL_MECHANISM_PLAIN = "PLAIN";
+
+ private final Interaction _interaction;
+ private ConnectionStartOk _startOk;
+ private ConnectionTuneOk _tuneOk;
+ private ConnectionOpen _open;
+
+ public ConnectionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _startOk = new ConnectionStartOk();
+ _tuneOk = new ConnectionTuneOk();
+ _open = new ConnectionOpen();
+ }
+
+ public Interaction startOk() throws Exception
+ {
+ return _interaction.sendPerformative(_startOk);
+ }
+
+ public ConnectionInteraction startOkMechanism(final String mechanism)
+ {
+ _startOk.setMechanism(mechanism);
+ return this;
+ }
+
+ public Interaction tuneOk() throws Exception
+ {
+ return _interaction.sendPerformative(_tuneOk);
+ }
+
+ public Interaction open() throws Exception
+ {
+ return _interaction.sendPerformative(_open);
+ }
+
+ public ConnectionInteraction tuneOkChannelMax(final int channelMax)
+ {
+ _tuneOk.setChannelMax(channelMax);
+ return this;
+ }
+
+ public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize)
+ {
+ _tuneOk.setMaxFrameSize(maxFrameSize);
+ return this;
+ }
+
+ public ConnectionInteraction startOkResponse(final byte[] response)
+ {
+ _startOk.setResponse(response);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
new file mode 100644
index 0000000..e60049e
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.lang.Math.min;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_SEG;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.HEADER_SIZE;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.FrameSizeObserver;
+import org.apache.qpid.server.protocol.v0_10.ProtocolEventSender;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.transport.ByteBufferSender;
+
+/**
+ * Disassembler
+ */
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(Disassembler.class);
+ private final ByteBufferSender _sender;
+ private final Object _sendlock = new Object();
+ private volatile int _maxPayload;
+ private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+ {
+ public BBEncoder initialValue()
+ {
+ return new BBEncoder(4 * 1024);
+ }
+ };
+
+ public Disassembler(ByteBufferSender sender, int maxFrame)
+ {
+ _sender = sender;
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ _maxPayload = maxFrame - HEADER_SIZE;
+ }
+
+ public void send(ProtocolEvent event)
+ {
+ event.delegate(null, this);
+ }
+
+ public void flush()
+ {
+ synchronized (_sendlock)
+ {
+ _sender.flush();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_sendlock)
+ {
+ _sender.close();
+ }
+ }
+
+ public void init(Void v, ProtocolHeader header)
+ {
+ synchronized (_sendlock)
+ {
+ _sender.send(header.toByteBuffer());
+ _sender.flush();
+ }
+ }
+
+ public void control(Void v, Method method)
+ {
+ method(method, SegmentType.CONTROL);
+ }
+
+ public void command(Void v, Method method)
+ {
+ method(method, SegmentType.COMMAND);
+ }
+
+ private void method(Method method, SegmentType type)
+ {
+ BBEncoder enc = _encoder.get();
+ enc.init();
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
+ method.write(enc);
+ int methodLimit = enc.position();
+
+ byte flags = FIRST_SEG;
+
+ boolean payload = method.hasPayload();
+ if (!payload)
+ {
+ flags |= LAST_SEG;
+ }
+
+ int headerLimit = -1;
+ if (payload)
+ {
+ final Header hdr = method.getHeader();
+ if (hdr != null)
+ {
+ if(hdr.getDeliveryProperties() != null)
+ {
+ enc.writeStruct32(hdr.getDeliveryProperties());
+ }
+ if(hdr.getMessageProperties() != null)
+ {
+ enc.writeStruct32(hdr.getMessageProperties());
+ }
+ if(hdr.getNonStandardProperties() != null)
+ {
+ for (Struct st : hdr.getNonStandardProperties())
+ {
+ enc.writeStruct32(st);
+ }
+ }
+ }
+ headerLimit = enc.position();
+ }
+
+ synchronized (_sendlock)
+ {
+ ByteBuffer buf = enc.underlyingBuffer();
+ buf.flip();
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ copy.put(buf.duplicate());
+ copy.flip();
+
+ final ByteBuffer methodBuf = view(copy,0, methodLimit);
+ fragment(flags, type, method, methodBuf);
+ if (payload)
+ {
+ QpidByteBuffer qpidByteBuffer = method.getBody();
+ ByteBuffer body = null;
+ if (qpidByteBuffer != null)
+ {
+ body = ByteBuffer.allocate(qpidByteBuffer.remaining());
+ qpidByteBuffer.copyTo(body);
+ }
+ ByteBuffer headerBuf = view(copy, methodLimit, headerLimit);
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerBuf);
+ if (body != null)
+ {
+ fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+ }
+ }
+ }
+ }
+
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buffer)
+ {
+ byte typeb = (byte) type.getValue();
+ byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+ int remaining = buffer.remaining();
+ boolean first = true;
+ while (true)
+ {
+ int size = min(_maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buffer);
+
+ if (remaining == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buffer)
+ {
+ ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(4, (byte) 0);
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+
+ try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(data))
+ {
+ _sender.send(qpidByteBuffer);
+ }
+
+ if(size > 0)
+ {
+ final ByteBuffer view = view(buffer, 0, size);
+ try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(view))
+ {
+ _sender.send(qpidByteBuffer);
+ }
+ buffer.position(buffer.position() + size);
+ }
+ }
+
+ public void error(Void v, ProtocolError error)
+ {
+ throw new IllegalArgumentException(String.valueOf(error));
+ }
+
+ @Override
+ public void setMaxFrameSize(final int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ _maxPayload = maxFrame - HEADER_SIZE;
+
+ }
+
+ private static ByteBuffer view(ByteBuffer buffer, int offset, int length)
+ {
+ ByteBuffer view = buffer.slice();
+ view.position(offset);
+ int newLimit = Math.min(view.position() + length, view.capacity());
+ view.limit(newLimit);
+ return view.slice();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
new file mode 100644
index 0000000..fa79489
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ErrorResponse implements Response<ProtocolError>
+{
+ private final ProtocolError _error;
+
+ public ErrorResponse(final ProtocolError protocolError)
+ {
+ _error = protocolError;
+ }
+
+ @Override
+ public ProtocolError getBody()
+ {
+ return _error;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
new file mode 100644
index 0000000..2e6817b
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionSync;
+
+public class ExecutionInteraction
+{
+ private final Interaction _interaction;
+ private final ExecutionSync _sync;
+
+ public ExecutionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _sync = new ExecutionSync();
+ }
+
+ public ExecutionInteraction syncId(final int id)
+ {
+ _sync.setId(id);
+ return this;
+ }
+
+ public Interaction sync() throws Exception
+ {
+ _interaction.sendPerformative(_sync);
+ return _interaction;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
new file mode 100644
index 0000000..fff894b
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.apache.qpid.server.transport.util.Functions.str;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+
+ private final ProtocolEventReceiver _receiver;
+
+ public enum State
+ {
+ PROTO_HDR,
+ FRAME_HDR,
+ FRAME_BODY,
+ ERROR
+ }
+
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final Assembler _assembler;
+
+ private int _maxFrameSize = 4096;
+ private State _state;
+ private ByteBuffer input = null;
+ private int _needed;
+
+ private byte _flags;
+ private SegmentType _type;
+ private byte _track;
+ private int _channel;
+
+ FrameDecoder(final byte[] headerBytes)
+ {
+ _receiver = new ProtocolEventReceiver(headerBytes);
+ this._assembler = new Assembler(_receiver);
+ this._state = State.PROTO_HDR;
+ _needed = 8;
+
+ }
+
+ @Override
+ public Collection<Response<?>> decode(final ByteBuffer buf) throws Exception
+ {
+ int limit = buf.limit();
+ int remaining = buf.remaining();
+ while (remaining > 0)
+ {
+ if (remaining >= _needed)
+ {
+ int consumed = _needed;
+ int pos = buf.position();
+ if (input == null)
+ {
+ buf.limit(pos + _needed);
+ input = buf;
+ _state = next(pos);
+ buf.limit(limit);
+ buf.position(pos + consumed);
+ }
+ else
+ {
+ buf.limit(pos + _needed);
+ input.put(buf);
+ buf.limit(limit);
+ input.flip();
+ _state = next(0);
+ }
+
+ remaining -= consumed;
+ input = null;
+ }
+ else
+ {
+ if (input == null)
+ {
+ input = ByteBuffer.allocate(_needed);
+ }
+ input.put(buf);
+ _needed -= remaining;
+ remaining = 0;
+ }
+ }
+ return _receiver.getReceivedEvents();
+ }
+
+ private State next(int pos)
+ {
+ input.order(ByteOrder.BIG_ENDIAN);
+
+ switch (_state) {
+ case PROTO_HDR:
+ if (input.get(pos) != 'A' &&
+ input.get(pos + 1) != 'M' &&
+ input.get(pos + 2) != 'Q' &&
+ input.get(pos + 3) != 'P')
+ {
+ error("bad protocol header: %s", str(input));
+ return State.ERROR;
+ }
+
+ byte protoClass = input.get(pos + 4);
+ byte instance = input.get(pos + 5);
+ byte major = input.get(pos + 6);
+ byte minor = input.get(pos + 7);
+ _assembler.received(new ProtocolHeader(protoClass, instance, major, minor));
+ _needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ case FRAME_HDR:
+ _flags = input.get(pos);
+ _type = SegmentType.get(input.get(pos + 1));
+ int size = (0xFFFF & input.getShort(pos + 2));
+ size -= Frame.HEADER_SIZE;
+ _maxFrameSize = 64 * 1024;
+ if (size < 0 || size > (_maxFrameSize - 12))
+ {
+ error("bad frame size: %d", size);
+ return State.ERROR;
+ }
+ byte b = input.get(pos + 5);
+ if ((b & 0xF0) != 0) {
+ error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: '%x'", b);
+ return State.ERROR;
+ } else {
+ _track = (byte) (b & 0xF);
+ }
+ _channel = (0xFFFF & input.getShort(pos + 6));
+ if (size == 0)
+ {
+ Frame frame = new Frame(_flags, _type, _track, _channel, EMPTY_BYTE_BUFFER);
+ _assembler.received(frame);
+ _needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ }
+ else
+ {
+ _needed = size;
+ return State.FRAME_BODY;
+ }
+ case FRAME_BODY:
+ Frame frame = new Frame(_flags, _type, _track, _channel, input.slice());
+ _assembler.received(frame);
+ _needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private void error(String fmt, Object ... args)
+ {
+ _assembler.received(new ProtocolError(Frame.L1, fmt, args));
+ }
+
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
new file mode 100644
index 0000000..dfec4f4
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+ @Override
+ public ByteBuffer encode(final Object msg)
+ {
+ if (msg instanceof ProtocolEvent)
+ {
+ final List<ByteBuffer> buffers = new ArrayList<>();
+ final AtomicInteger totalSize = new AtomicInteger();
+ Disassembler disassembler = new Disassembler(new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ int remaining = msg.remaining();
+ byte[] data = new byte[remaining];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+ msg.get(data);
+ buffers.add(byteBuffer);
+ totalSize.addAndGet(remaining);
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ }, 512);
+
+ disassembler.send((ProtocolEvent) msg);
+ ByteBuffer data = ByteBuffer.allocate(totalSize.get());
+ for (ByteBuffer buffer : buffers)
+ {
+ data.put(buffer);
+ }
+ data.flip();
+ return data;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
new file mode 100644
index 0000000..3b7849c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+
+import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+
+
+public class FrameTransport extends AbstractFrameTransport<Interaction>
+{
+ private final byte[] _protocolHeader;
+
+ public FrameTransport(final InetSocketAddress brokerAddress)
+ {
+ super(brokerAddress, new FrameDecoder(new ProtocolEngineCreator_0_10().getHeaderIdentifier()), new FrameEncoder());
+ _protocolHeader = new ProtocolEngineCreator_0_10().getHeaderIdentifier();
+ }
+
+ @Override
+ public byte[] getProtocolHeader()
+ {
+ return _protocolHeader;
+ }
+
+ @Override
+ public Interaction newInteraction()
+ {
+ return new Interaction(this);
+ }
+
+ @Override
+ public FrameTransport connect()
+ {
+ super.connect();
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
new file mode 100644
index 0000000..5d53a89
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
+
+public class Interaction extends AbstractInteraction<Interaction>
+{
+ private ConnectionInteraction _connectionInteraction;
+ private SessionInteraction _sessionInteraction;
+ private MessageInteraction _messageInteraction;
+ private ExecutionInteraction _executionInteraction;
+ private int _channelId;
+ private TxInteraction _txInteraction;
+
+ public Interaction(final AbstractFrameTransport frameTransport)
+ {
+ super(frameTransport);
+ _connectionInteraction = new ConnectionInteraction(this);
+ _sessionInteraction = new SessionInteraction(this);
+ _messageInteraction = new MessageInteraction(this);
+ _executionInteraction = new ExecutionInteraction(this);
+ _txInteraction = new TxInteraction(this);
+ }
+
+ @Override
+ protected byte[] getProtocolHeader()
+ {
+ return getTransport().getProtocolHeader();
+ }
+
+ public <T extends Method> Interaction sendPerformative(final T performative) throws Exception
+ {
+ performative.setChannel(_channelId);
+ sendPerformativeAndChainFuture(copyPerformative(performative));
+ return this;
+ }
+
+ public ConnectionInteraction connection()
+ {
+ return _connectionInteraction;
+ }
+
+ private <T extends Method> T copyPerformative(final T src)
+ {
+ T dst = (T) Method.create(src.getStructType());
+ final BBEncoder encoder = new BBEncoder(4096);
+ encoder.init();
+ src.write(encoder);
+ ByteBuffer buffer = encoder.buffer();
+
+ final BBDecoder decoder = new BBDecoder();
+ decoder.init(buffer);
+ dst.read(decoder);
+ return dst;
+ }
+
+ public Interaction openAnonymousConnection() throws Exception
+ {
+ this.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse(ConnectionTune.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOk.class);
+ return this;
+ }
+
+ public SessionInteraction session()
+ {
+ return _sessionInteraction;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public Interaction channelId(final int channelId)
+ {
+ _channelId = channelId;
+ return this;
+ }
+
+ public Interaction attachSession(final byte[] sessionName) throws Exception
+ {
+ this.session()
+ .attachName(sessionName)
+ .attach()
+ .consumeResponse(SessionAttached.class)
+ .session().commandPointCommandId(0).commandPoint();
+ return this;
+ }
+
+ public MessageInteraction message()
+ {
+ return _messageInteraction;
+ }
+
+ public ExecutionInteraction execution()
+ {
+ return _executionInteraction;
+ }
+
+ public TxInteraction tx()
+ {
+ return _txInteraction;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
new file mode 100644
index 0000000..4660c86
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageFlow;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageSubscribe;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+
+public class MessageInteraction
+{
+ private final Interaction _interaction;
+ private MessageTransfer _transfer;
+ private MessageSubscribe _subscribe;
+ private MessageFlow _flow;
+ private MessageAccept _accept;
+
+ public MessageInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _transfer = new MessageTransfer();
+ _subscribe = new MessageSubscribe();
+ _flow = new MessageFlow();
+ _accept = new MessageAccept();
+ }
+
+ public MessageInteraction transferId(final int id)
+ {
+ _transfer.setId(id);
+ return this;
+ }
+
+ public MessageInteraction transferDesitnation(final String destination)
+ {
+ _transfer.setDestination(destination);
+ return this;
+ }
+
+ public Interaction transfer() throws Exception
+ {
+ _interaction.sendPerformative(_transfer);
+ return _interaction;
+ }
+
+ public MessageInteraction subscribeQueue(final String queueName)
+ {
+ _subscribe.setQueue(queueName);
+ return this;
+ }
+
+ public MessageInteraction subscribeId(final int id)
+ {
+ _subscribe.setId(id);
+ return this;
+ }
+
+ public Interaction subscribe() throws Exception
+ {
+ return _interaction.sendPerformative(_subscribe);
+ }
+
+ public MessageInteraction subscribeDestination(final String destination)
+ {
+ _subscribe.setDestination(destination);
+ return this;
+ }
+
+ public Interaction flow() throws Exception
+ {
+ return _interaction.sendPerformative(_flow);
+ }
+
+ public MessageInteraction flowId(final int id)
+ {
+ _flow.setId(id);
+ return this;
+ }
+
+ public MessageInteraction flowDestination(final String destination)
+ {
+ _flow.setDestination(destination);
+ return this;
+ }
+
+ public MessageInteraction flowUnit(final MessageCreditUnit unit)
+ {
+ _flow.setUnit(unit);
+ return this;
+ }
+
+ public MessageInteraction flowValue(final long value)
+ {
+ _flow.setValue(value);
+ return this;
+ }
+
+ public MessageInteraction subscribeAcceptMode(final MessageAcceptMode acceptMode)
+ {
+ _subscribe.setAcceptMode(acceptMode);
+ return this;
+ }
+
+ public MessageInteraction subscribeAcquireMode(final MessageAcquireMode acquireMode)
+ {
+ _subscribe.setAcquireMode(acquireMode);
+ return this;
+ }
+
+ public Interaction accept() throws Exception
+ {
+ return _interaction.sendPerformative(_accept);
+ }
+
+ public MessageInteraction acceptId(final int id)
+ {
+ _accept.setId(id);
+ return this;
+ }
+
+ public MessageInteraction acceptTransfers(final RangeSet transfers)
+ {
+ _accept.setTransfers(transfers);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
new file mode 100644
index 0000000..701e92c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.tests.protocol.Response;
+
+public class PerformativeResponse implements Response<Method>
+{
+ private Method _method;
+
+ public PerformativeResponse(final Method method)
+ {
+ _method = method;
+ }
+
+ @Override
+ public Method getBody()
+ {
+ return _method;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PerformativeResponse{" +
+ "_method=" + _method +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
new file mode 100644
index 0000000..37eb66e
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ProtocolEventReceiver
+{
+ private Queue<Response<?>> _events = new ConcurrentLinkedQueue<>();
+ private final byte[] _headerBytes;
+
+ public ProtocolEventReceiver(final byte[] headerBytes)
+ {
+ _headerBytes = headerBytes;
+ }
+
+ void received(ProtocolEvent msg)
+ {
+ if (msg instanceof ProtocolHeader)
+ {
+ _events.add(new HeaderResponse(_headerBytes));
+ }
+ else if (msg instanceof Method)
+ {
+ _events.add(new PerformativeResponse((Method) msg));
+ }
+ else if (msg instanceof ProtocolError)
+ {
+ _events.add(new ErrorResponse((ProtocolError) msg));
+ }
+ }
+
+ public Collection<Response<?>> getReceivedEvents()
+ {
+ Collection<Response<?>> results = new ArrayList<>(_events);
+ _events.removeAll(results);
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
new file mode 100644
index 0000000..fce711d
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+
+public class SessionInteraction
+{
+ private final Interaction _interaction;
+ private SessionAttach _attach;
+ private SessionDetach _detach;
+ private SessionCommandPoint _commandPoint;
+ private SessionFlush _flush;
+
+ public SessionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _attach = new SessionAttach();
+ _detach = new SessionDetach();
+ _commandPoint = new SessionCommandPoint();
+ _flush = new SessionFlush();
+ }
+
+ public Interaction attach() throws Exception
+ {
+ return _interaction.sendPerformative(_attach);
+ }
+
+ public SessionInteraction attachName(final byte[] name)
+ {
+ _attach.setName(name);
+ return this;
+ }
+
+ public Interaction detach() throws Exception
+ {
+ return _interaction.sendPerformative(_detach);
+ }
+
+ public SessionInteraction detachName(final byte[] sessionName)
+ {
+ _detach.setName(sessionName);
+ return this;
+ }
+
+ public Interaction commandPoint() throws Exception
+ {
+ return _interaction.sendPerformative(_commandPoint);
+ }
+
+ public SessionInteraction commandPointCommandId(final int commandId)
+ {
+ _commandPoint.setCommandId(commandId);
+ return this;
+ }
+
+ public Interaction flush() throws Exception
+ {
+ return _interaction.sendPerformative(_flush);
+ }
+
+ public SessionInteraction flushCompleted()
+ {
+ _flush.setCompleted(true);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
new file mode 100644
index 0000000..14c9912
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.TxCommit;
+import org.apache.qpid.server.protocol.v0_10.transport.TxSelect;
+
+public class TxInteraction
+{
+ private final Interaction _interaction;
+ private final TxSelect _select;
+ private final TxCommit _commit;
+
+ public TxInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _select = new TxSelect();
+ _commit = new TxCommit();
+ }
+
+ public Interaction select() throws Exception
+ {
+ return _interaction.sendPerformative(_select);
+ }
+
+ public TxInteraction selectId(final int id)
+ {
+ _select.setId(id);
+ return this;
+ }
+
+ public TxInteraction commitId(final int id)
+ {
+ _commit.setId(id);
+ return this;
+ }
+
+ public Interaction commit() throws Exception
+ {
+ return _interaction.sendPerformative(_commit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
new file mode 100644
index 0000000..69387fb
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+ "name" : "${broker.name}",
+ "modelVersion" : "7.0",
+ "authenticationproviders" : [ {
+ "name" : "anon",
+ "type" : "Anonymous"
+ }, {
+ "name" : "plain",
+ "type" : "Plain",
+ "secureOnlyMechanisms" : [],
+ "users" : [ {
+ "name" : "admin",
+ "type" : "managed",
+ "password" : "admin"
+ }, {
+ "name" : "guest",
+ "type" : "managed",
+ "password" : "guest"
+ } ]
+ } ],
+ "ports" : [ {
+ "name" : "AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "plain",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_10" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias"
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias"
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias"
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_10" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ } ],
+ "virtualhostnodes" : []
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
new file mode 100644
index 0000000..1072f7c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ConnectionTest extends BrokerAdminUsingTestBase
+{
+ private static final String DEFAULT_LOCALE = "en_US";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "4.3. Version Negotiation",
+ description = "When the client opens a new socket connection to an AMQP server,"
+ + " it MUST send a protocol header with the client's preferred protocol version."
+ + "If the requested protocol version is supported, the server MUST send its own protocol"
+ + " header with the requested version to the socket, and then implement the protocol accordingly")
+ public void versionNegotiation() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse();
+ assertThat(response, is(instanceOf(HeaderResponse.class)));
+ assertThat(response.getBody(), is(IsEqual.equalTo(transport.getProtocolHeader())));
+
+ ConnectionStart connectionStart = interaction.consumeResponse().getLatestResponse(ConnectionStart.class);
+ assertThat(connectionStart.getMechanisms(), is(notNullValue()));
+ assertThat(connectionStart.getMechanisms(), contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS));
+ assertThat(connectionStart.getLocales(), is(notNullValue()));
+ assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9.connection.start-ok",
+ description = "An AMQP client MUST handle incoming connection.start controls.")
+ public void startOk() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9.connection.tune-ok",
+ description = "This control sends the client's connection tuning parameters to the server."
+ + " Certain fields are negotiated, others provide capability information.")
+ public void tuneOkAndOpen() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse(ConnectionTune.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionOpenOk.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassBySendingTuneOk() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionClose.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassBySendingOpen() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse().consumeResponse(ConnectionStart.class)
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionClose.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassAfterSendingStartOk() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk().consumeResponse(ConnectionSecure.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "9.connection.tune-ok.minimum",
+ description = "[...] the minimum negotiated value for max-frame-size is also MIN-MAX-FRAME-SIZE [4096]")
+ public void tooSmallFrameSize() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkMaxFrameSize(1024)
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9.connection.tune-ok.max-frame-size",
+ description = "If the client specifies a channel max that is higher than the value provided by the server,"
+ + " the server MUST close the connection without attempting a negotiated close."
+ + " The server may report the error in some fashion to assist implementers.")
+ public void tooLargeFrameSize() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+ assumeThat(response.hasMaxFrameSize(), is(true));
+ assumeThat(response.getMaxFrameSize(), is(lessThan(0xFFFF)));
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkMaxFrameSize(response.getMaxFrameSize() + 1)
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org