You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/18 01:42:19 UTC
[1/3] qpid-broker-j git commit: [Broker-J][System Tests] Add protocol
system test suites for AMQP 0-8, 0-9 and 0-9-1
Repository: qpid-broker-j
Updated Branches:
refs/heads/master a3c00bbfc -> c6d80d80e
[Broker-J][System Tests] Add protocol system test suites for AMQP 0-8,0-9 and 0-9-1
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c6d80d80
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c6d80d80
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c6d80d80
Branch: refs/heads/master
Commit: c6d80d80e685f70653f609951868a9e8f0ffb5a6
Parents: 06e53d7
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:32:22 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Sat Nov 18 01:38:53 2017 +0000
----------------------------------------------------------------------
.../qpid/server/protocol/v0_8/AMQDecoder.java | 4 +-
.../server/protocol/v0_8/ServerDecoder.java | 2 +-
.../server/protocol/v0_8/ClientDecoder.java | 2 +-
pom.xml | 7 +
systests/protocol-tests-amqp-0-8/pom.xml | 109 +++++++
.../tests/protocol/v0_8/BasicInteraction.java | 214 ++++++++++++
.../tests/protocol/v0_8/ChannelInteraction.java | 51 +++
.../qpid/tests/protocol/v0_8/ClientDecoder.java | 323 +++++++++++++++++++
.../protocol/v0_8/ConnectionInteraction.java | 103 ++++++
.../qpid/tests/protocol/v0_8/FrameDecoder.java | 111 +++++++
.../qpid/tests/protocol/v0_8/FrameEncoder.java | 83 +++++
.../tests/protocol/v0_8/FrameTransport.java | 101 ++++++
.../qpid/tests/protocol/v0_8/Interaction.java | 109 +++++++
.../protocol/v0_8/PerformativeResponse.java | 54 ++++
.../tests/protocol/v0_8/QueueInteraction.java | 63 ++++
.../resources/config-protocol-tests-0-8.json | 78 +++++
.../qpid/tests/protocol/v0_8/BasicTest.java | 156 +++++++++
.../qpid/tests/protocol/v0_8/ChannelTest.java | 54 ++++
.../tests/protocol/v0_8/ConnectionTest.java | 183 +++++++++++
.../qpid/tests/protocol/v0_8/QueueTest.java | 67 ++++
20 files changed, 1870 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
index 706f1ae..add9fb1 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQDecoder.java
@@ -217,11 +217,11 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- abstract void processMethod(int channelId,
+ protected abstract void processMethod(int channelId,
QpidByteBuffer in)
throws AMQFrameDecodingException;
- AMQFrameDecodingException newUnknownMethodException(final int classId,
+ protected AMQFrameDecodingException newUnknownMethodException(final int classId,
final int methodId,
ProtocolVersion protocolVersion)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
index 08ccda9..59d4985 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerDecoder.java
@@ -46,7 +46,7 @@ public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends Se
@Override
- void processMethod(int channelId,
+ protected void processMethod(int channelId,
QpidByteBuffer in)
throws AMQFrameDecodingException
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
index de8afb9..8e02bbb 100644
--- a/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
+++ b/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ClientDecoder.java
@@ -86,7 +86,7 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
}
@Override
- void processMethod(int channelId,
+ protected void processMethod(int channelId,
QpidByteBuffer in)
throws AMQFrameDecodingException
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 62e7033..e7d58bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
<module>systests/systests-utils</module>
<module>systests/qpid-systests-jms_2.0</module>
<module>systests/protocol-tests-core</module>
+ <module>systests/protocol-tests-amqp-0-8</module>
<module>systests/protocol-tests-amqp-1-0</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
@@ -420,6 +421,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-amqp-0-8</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/pom.xml b/systests/protocol-tests-amqp-0-8/pom.xml
new file mode 100644
index 0000000..3f788db
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.1.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-amqp-0-8</artifactId>
+ <name>Apache Qpid Protocol Tests for AMQP 0-8,0-9,0-9-1</name>
+ <description>Tests for AMQP 0-8,0-9,0-9-1</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-test-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-logging-logback</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-memory-store</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-derby-store</artifactId>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-bdbstore</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-8.json</qpid.initialConfigurationLocation>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
new file mode 100644
index 0000000..9ef66ec
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosBody;
+import org.apache.qpid.server.protocol.v0_8.transport.CompositeAMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+
+public class BasicInteraction
+{
+ private final Interaction _interaction;
+ private String _publishExchange;
+ private String _publishRoutingKey;
+ private boolean _publishMandatory;
+ private boolean _publishImmediate;
+ private byte[] _content;
+ private Map<String, Object> _contentHeaderPropertiesHeaders = new HashMap<>();
+ private String _contentHeaderPropertiesContentType;
+ private byte _contentHeaderPropertiesDeliveryMode;
+ private byte _contentHeaderPropertiesPriority;
+ private int _qosPrefetchCount;
+ private long _qosPrefetchSize;
+ private boolean _qosGlobal;
+ private String _consumeQueueName;
+ private String _consumeConsumerTag;
+ private boolean _consumeNoLocal;
+ private boolean _consumeNoAck;
+ private boolean _consumeExclusive;
+ private boolean _consumeNoWait;
+ private Map<String, Object> _consumeArguments = new HashMap<>();
+ private long _ackDeliveryTag;
+ private boolean _ackMultiple;
+
+ public BasicInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ }
+
+ public Interaction publish() throws Exception
+ {
+ return _interaction.sendPerformative(new BasicPublishBody(0,
+ AMQShortString.valueOf(_publishExchange),
+ AMQShortString.valueOf(_publishRoutingKey),
+ _publishMandatory,
+ _publishImmediate));
+ }
+
+ public BasicInteraction content(final String content)
+ {
+ _content = content.getBytes(StandardCharsets.UTF_8);
+ return this;
+ }
+
+ public BasicInteraction content(final byte[] content)
+ {
+ _content = content;
+ return this;
+ }
+
+ public BasicInteraction contentHeaderPropertiesHeaders(final Map<String, Object> messageHeaders)
+ {
+ _contentHeaderPropertiesHeaders = messageHeaders;
+ return this;
+ }
+
+ public BasicInteraction contentHeaderPropertiesContentType(final String messageContentType)
+ {
+ _contentHeaderPropertiesContentType = messageContentType;
+ return this;
+ }
+
+ public BasicInteraction contentHeaderPropertiesPriority(final byte priority)
+ {
+ _contentHeaderPropertiesPriority = priority;
+ return this;
+ }
+
+ public BasicInteraction contentHeaderPropertiesDeliveryMode(final byte deliveryMode)
+ {
+ _contentHeaderPropertiesDeliveryMode = deliveryMode;
+ return this;
+ }
+
+
+ public Interaction publishMessage() throws Exception
+ {
+ List<AMQFrame> frames = new ArrayList<>();
+ BasicPublishBody publishFrame = new BasicPublishBody(0,
+ AMQShortString.valueOf(_publishExchange),
+ AMQShortString.valueOf(_publishRoutingKey),
+ _publishMandatory,
+ _publishImmediate);
+ frames.add(new AMQFrame(_interaction.getChannelId(), publishFrame));
+ final BasicContentHeaderProperties basicContentHeaderProperties = new BasicContentHeaderProperties();
+ basicContentHeaderProperties.setHeaders(FieldTable.convertToFieldTable(_contentHeaderPropertiesHeaders));
+ basicContentHeaderProperties.setContentType(_contentHeaderPropertiesContentType);
+ basicContentHeaderProperties.setDeliveryMode(_contentHeaderPropertiesDeliveryMode);
+ basicContentHeaderProperties.setPriority(_contentHeaderPropertiesPriority);
+ final int contentSize = _content == null ? 0 : _content.length;
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(basicContentHeaderProperties, contentSize);
+ frames.add(new AMQFrame(_interaction.getChannelId(), contentHeaderBody));
+ if (contentSize > 0)
+ {
+ final int framePayloadMax = _interaction.getMaximumFrameSize() - 8;
+ int offset = 0;
+ do
+ {
+ int contentToCopyLength = Math.min(framePayloadMax, contentSize - offset);
+ ContentBody contentBody = new ContentBody(ByteBuffer.wrap(_content, offset,
+ contentToCopyLength));
+ frames.add(new AMQFrame(_interaction.getChannelId(), contentBody));
+ offset += contentToCopyLength;
+ }
+ while (offset < contentSize);
+ }
+
+ CompositeAMQDataBlock frame = new CompositeAMQDataBlock(frames.toArray(new AMQFrame[frames.size()]));
+
+ return _interaction.sendPerformative(frame);
+ }
+
+ public BasicInteraction publishExchange(final String exchangeName)
+ {
+ _publishExchange = exchangeName;
+ return this;
+ }
+
+ public BasicInteraction publishRoutingKey(final String queueName)
+ {
+ _publishRoutingKey = queueName;
+ return this;
+ }
+
+ public BasicInteraction qosPrefetchCount(final int prefetchCount)
+ {
+ _qosPrefetchCount = prefetchCount;
+ return this;
+ }
+
+ public Interaction qos() throws Exception
+ {
+ return _interaction.sendPerformative(new BasicQosBody(_qosPrefetchSize,
+ _qosPrefetchCount,
+ _qosGlobal));
+ }
+
+ public Interaction consume() throws Exception
+ {
+ return _interaction.sendPerformative(new BasicConsumeBody(0,
+ AMQShortString.valueOf(_consumeQueueName),
+ AMQShortString.valueOf(_consumeConsumerTag),
+ _consumeNoLocal,
+ _consumeNoAck,
+ _consumeExclusive,
+ _consumeNoWait,
+ FieldTable.convertToFieldTable(_consumeArguments)));
+ }
+
+ public BasicInteraction consumeConsumerTag(final String consumerTag)
+ {
+ _consumeConsumerTag = consumerTag;
+ return this;
+ }
+
+ public BasicInteraction consumeQueue(final String queueName)
+ {
+ _consumeQueueName = queueName;
+ return this;
+ }
+
+ public Interaction ack() throws Exception
+ {
+ return _interaction.sendPerformative(new BasicAckBody(_ackDeliveryTag, _ackMultiple));
+ }
+
+ public BasicInteraction ackDeliveryTag(final long deliveryTag)
+ {
+ _ackDeliveryTag = deliveryTag;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
new file mode 100644
index 0000000..51d4426
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenBody;
+
+public class ChannelInteraction
+{
+ private Interaction _interaction;
+
+ public ChannelInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ }
+
+ public Interaction open() throws Exception
+ {
+ return _interaction.sendPerformative(new ChannelOpenBody());
+ }
+
+ public Interaction close() throws Exception
+ {
+ return _interaction.sendPerformative(new ChannelCloseBody(200, AMQShortString.valueOf(""), 0, 0));
+ }
+
+ public Interaction flow(final boolean active) throws Exception
+ {
+ return _interaction.sendPerformative(new ChannelFlowBody(active));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java
new file mode 100644
index 0000000..2bd9d27
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ClientDecoder.java
@@ -0,0 +1,323 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.AMQDecoder;
+import org.apache.qpid.server.protocol.v0_8.AMQFrameDecodingException;
+import org.apache.qpid.server.protocol.v0_8.transport.*;
+
+public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
+{
+ private QpidByteBuffer _incompleteBuffer;
+
+ /**
+ * Creates a new AMQP decoder.
+ *
+ * @param methodProcessor method processor
+ */
+ public ClientDecoder(final ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor)
+ {
+ super(false, methodProcessor);
+ }
+
+ public void decodeBuffer(ByteBuffer incomingBuffer) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ if (_incompleteBuffer == null)
+ {
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(incomingBuffer);
+ final int required = decode(qpidByteBuffer);
+ if (required != 0)
+ {
+ _incompleteBuffer = QpidByteBuffer.allocate(qpidByteBuffer.remaining() + required);
+ _incompleteBuffer.put(qpidByteBuffer);
+ }
+ qpidByteBuffer.dispose();
+ }
+ else
+ {
+ if (incomingBuffer.remaining() < _incompleteBuffer.remaining())
+ {
+ _incompleteBuffer.put(incomingBuffer);
+ }
+ else
+ {
+ _incompleteBuffer.flip();
+ final QpidByteBuffer aggregatedBuffer =
+ QpidByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining());
+ aggregatedBuffer.put(_incompleteBuffer);
+ aggregatedBuffer.put(incomingBuffer);
+ aggregatedBuffer.flip();
+ final int required = decode(aggregatedBuffer);
+
+ _incompleteBuffer.dispose();
+ if (required != 0)
+ {
+ _incompleteBuffer = QpidByteBuffer.allocate(aggregatedBuffer.remaining() + required);
+ _incompleteBuffer.put(aggregatedBuffer);
+ }
+ else
+ {
+ _incompleteBuffer = null;
+ }
+ aggregatedBuffer.dispose();
+ }
+ }
+ // post-condition: assert(!incomingBuffer.hasRemaining());
+ }
+
+ @Override
+ protected void processMethod(int channelId,
+ QpidByteBuffer in)
+ throws AMQFrameDecodingException
+ {
+ ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor();
+ ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
+ final int classAndMethod = in.getInt();
+ int classId = classAndMethod >> 16;
+ int methodId = classAndMethod & 0xFFFF;
+ methodProcessor.setCurrentMethod(classId, methodId);
+ try
+ {
+ switch (classAndMethod)
+ {
+ //CONNECTION_CLASS:
+ case 0x000a000a:
+ ConnectionStartBody.process(in, methodProcessor);
+ break;
+ case 0x000a0014:
+ ConnectionSecureBody.process(in, methodProcessor);
+ break;
+ case 0x000a001e:
+ ConnectionTuneBody.process(in, methodProcessor);
+ break;
+ case 0x000a0029:
+ ConnectionOpenOkBody.process(in, methodProcessor);
+ break;
+ case 0x000a002a:
+ ConnectionRedirectBody.process(in, methodProcessor);
+ break;
+ case 0x000a0032:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+ {
+ ConnectionRedirectBody.process(in, methodProcessor);
+ }
+ else
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ break;
+ case 0x000a0033:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ else
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ break;
+ case 0x000a003c:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+ {
+ ConnectionCloseBody.process(in, methodProcessor);
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ break;
+ case 0x000a003d:
+ if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
+ {
+ methodProcessor.receiveConnectionCloseOk();
+ }
+ else
+ {
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+ }
+ break;
+
+ // CHANNEL_CLASS:
+
+ case 0x0014000b:
+ ChannelOpenOkBody.process(in, methodProcessor.getProtocolVersion(), channelMethodProcessor);
+ break;
+ case 0x00140014:
+ ChannelFlowBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140015:
+ ChannelFlowOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x0014001e:
+ ChannelAlertBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140028:
+ ChannelCloseBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00140029:
+ channelMethodProcessor.receiveChannelCloseOk();
+ break;
+
+ // ACCESS_CLASS:
+
+ case 0x001e000b:
+ AccessRequestOkBody.process(in, channelMethodProcessor);
+ break;
+
+ // EXCHANGE_CLASS:
+
+ case 0x0028000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveExchangeDeclareOk();
+ }
+ break;
+ case 0x00280015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveExchangeDeleteOk();
+ }
+ break;
+ case 0x00280017:
+ ExchangeBoundOkBody.process(in, channelMethodProcessor);
+ break;
+
+
+ // QUEUE_CLASS:
+
+ case 0x0032000b:
+ QueueDeclareOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveQueueBindOk();
+ }
+ break;
+ case 0x0032001f:
+ QueuePurgeOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320029:
+ QueueDeleteOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x00320033:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveQueueUnbindOk();
+ }
+ break;
+
+
+ // BASIC_CLASS:
+
+ case 0x003c000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicQosOk();
+ }
+ break;
+ case 0x003c0015:
+ BasicConsumeOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c001f:
+ BasicCancelOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0032:
+ BasicReturnBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c003c:
+ BasicDeliverBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0047:
+ BasicGetOkBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0048:
+ BasicGetEmptyBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0050:
+ BasicAckBody.process(in, channelMethodProcessor);
+ break;
+ case 0x003c0065:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicRecoverSyncOk();
+ }
+ break;
+ case 0x003c006f:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveBasicRecoverSyncOk();
+ }
+ break;
+ case 0x003c0078:
+ BasicNackBody.process(in, channelMethodProcessor);
+ break;
+
+ // CONFIRM CLASS:
+
+ case 0x0055000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveConfirmSelectOk();
+ }
+ break;
+
+ // TX_CLASS:
+
+ case 0x005a000b:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxSelectOk();
+ }
+ break;
+ case 0x005a0015:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxCommitOk();
+ }
+ break;
+ case 0x005a001f:
+ if(!channelMethodProcessor.ignoreAllButCloseOk())
+ {
+ channelMethodProcessor.receiveTxRollbackOk();
+ }
+ break;
+
+ default:
+ throw newUnknownMethodException(classId, methodId,
+ methodProcessor.getProtocolVersion());
+
+ }
+ }
+ finally
+ {
+ methodProcessor.setCurrentMethod(0, 0);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
new file mode 100644
index 0000000..023e7fc
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody;
+
+public class ConnectionInteraction
+{
+ private final Interaction _interaction;
+
+ private final Map<String, Object> _startOkClientProperties = new HashMap<>();
+ private String _startOkMechanism;
+ private byte[] _startOkResponse;
+ private String _startOkLocale;
+ private int _tuneOkChannelMax;
+ private long _tuneOkFrameMax;
+ private int _tuneOkHeartbeat;
+ private String _openVirtualHost;
+
+ public ConnectionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ }
+
+
+ public ConnectionInteraction startOkMechanism(final String startOkMechanism)
+ {
+ _startOkMechanism = startOkMechanism;
+ return this;
+ }
+
+
+ public Interaction startOk() throws Exception
+ {
+ return _interaction.sendPerformative(new ConnectionStartOkBody(FieldTable.convertToFieldTable(_startOkClientProperties),
+ AMQShortString.valueOf(_startOkMechanism),
+ _startOkResponse,
+ AMQShortString.valueOf(_startOkLocale)));
+ }
+
+ public ConnectionInteraction tuneOkChannelMax(final int channelMax)
+ {
+ _tuneOkChannelMax = channelMax;
+ return this;
+ }
+
+ public ConnectionInteraction tuneOkFrameMax(final long frameMax)
+ {
+ _tuneOkFrameMax = frameMax;
+ return this;
+ }
+
+ public ConnectionInteraction tuneOkHeartbeat(final int heartbeat)
+ {
+ _tuneOkHeartbeat = heartbeat;
+ return this;
+ }
+
+ public Interaction tuneOk() throws Exception
+ {
+ return _interaction.sendPerformative(new ConnectionTuneOkBody(_tuneOkChannelMax,
+ _tuneOkFrameMax,
+ _tuneOkHeartbeat));
+ }
+
+ public ConnectionInteraction openVirtualHost(String virtualHost)
+ {
+ _openVirtualHost = virtualHost;
+ return this;
+ }
+
+ public Interaction open() throws Exception
+ {
+ return _interaction.sendPerformative(new ConnectionOpenBody(AMQShortString.valueOf(_openVirtualHost),
+ null,
+ false));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java
new file mode 100644
index 0000000..499fe72
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameDecoder.java
@@ -0,0 +1,111 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.server.protocol.v0_8.transport.FrameCreatingMethodProcessor;
+import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+ private final ClientDecoder _clientDecoder;
+ private final FrameCreatingMethodProcessor _methodProcessor;
+
+ FrameDecoder(ProtocolVersion protocolVersion)
+ {
+ _methodProcessor = new FrameCreatingMethodProcessor(protocolVersion);
+ _clientDecoder = new ClientDecoder(_methodProcessor);
+ }
+
+ @Override
+ public Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception
+ {
+ _clientDecoder.decodeBuffer(inputBuffer);
+
+ List<AMQDataBlock> receivedFrames = new ArrayList<>(_methodProcessor.getProcessedMethods());
+ List<Response<?>> result = new ArrayList<>();
+
+ for (AMQDataBlock frame : receivedFrames)
+ {
+ if (frame instanceof AMQFrame)
+ {
+ AMQFrame amqFrame = (AMQFrame) frame;
+ result.add(new PerformativeResponse(amqFrame.getChannel(), amqFrame.getSize(), amqFrame.getBodyFrame()));
+ }
+ else if (frame instanceof ProtocolInitiation)
+ {
+ byte[] data = new byte[(int) frame.getSize()];
+ frame.writePayload(new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ msg.copyTo(data);
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
+
+ result.add(new HeaderResponse(data));
+ }
+ else
+ {
+ throw new IllegalArgumentException(String.format("Unexpected data block received %s", frame));
+ }
+ }
+ _methodProcessor.getProcessedMethods().removeAll(receivedFrames);
+ return result;
+ }
+
+ ProtocolVersion getVersion()
+ {
+ return _methodProcessor.getProtocolVersion();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java
new file mode 100644
index 0000000..9b471fc
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameEncoder.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+
+ @Override
+ public ByteBuffer encode(final Object msg)
+ {
+ if (msg instanceof AMQDataBlock)
+ {
+ final List<ByteBuffer>buffers = new ArrayList<>();
+ ((AMQDataBlock)msg).writePayload(new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ byte[] data = new byte[msg.remaining()];
+ msg.get(data);
+ buffers.add(ByteBuffer.wrap(data));
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
+ int remaining = 0;
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ remaining += byteBuffer.remaining();
+ }
+ ByteBuffer result = ByteBuffer.allocate(remaining);
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ result.put(byteBuffer);
+ }
+ result.flip();
+ return result;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
new file mode 100644
index 0000000..52cd7a0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.net.InetSocketAddress;
+
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+
+
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
+{
+ private final byte[] _protocolHeader;
+ private ProtocolVersion _protocolVersion;
+
+ public FrameTransport(final InetSocketAddress brokerAddress)
+ {
+ this(brokerAddress, Protocol.AMQP_0_9_1);
+ }
+ public FrameTransport(final InetSocketAddress brokerAddress, Protocol protocol)
+ {
+ super(brokerAddress, new FrameDecoder(getProtocolVersion(protocol)), new FrameEncoder());
+ _protocolVersion = getProtocolVersion(protocol);
+ byte[] protocolHeader = null;
+ for(ProtocolEngineCreator installedEngine : (new QpidServiceLoader()).instancesOf(ProtocolEngineCreator.class))
+ {
+ if (installedEngine.getVersion() == protocol)
+ {
+ protocolHeader = installedEngine.getHeaderIdentifier();
+ }
+ }
+
+ if (protocolHeader == null)
+ {
+ throw new IllegalArgumentException(String.format("Unsupported protocol %s", protocol));
+ }
+ _protocolHeader = protocolHeader;
+ }
+
+ @Override
+ public FrameTransport connect()
+ {
+ super.connect();
+ return this;
+ }
+
+ public Interaction newInteraction()
+ {
+ return new Interaction(this);
+ }
+
+ public byte[] getProtocolHeader()
+ {
+ return _protocolHeader;
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
+ }
+
+ public static ProtocolVersion getProtocolVersion(Protocol protocol)
+ {
+ final ProtocolVersion protocolVersion;
+ switch (protocol)
+ {
+ case AMQP_0_8:
+ protocolVersion = ProtocolVersion.v0_8;
+ break;
+ case AMQP_0_9_1:
+ protocolVersion = ProtocolVersion.v0_91;
+ break;
+ case AMQP_0_9:
+ protocolVersion = ProtocolVersion.v0_9;
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unsupported protocol %s", protocol));
+ }
+ return protocolVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
new file mode 100644
index 0000000..0b62770
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
+{
+
+ private int _channelId;
+ private int _maximumPayloadSize = 512;
+
+ Interaction(final FrameTransport transport)
+ {
+ super(transport);
+ }
+
+ @Override
+ protected byte[] getProtocolHeader()
+ {
+ return getTransport().getProtocolHeader();
+ }
+
+ @Override
+ protected Interaction getInteraction()
+ {
+ return this;
+ }
+
+ public Interaction sendPerformative(final AMQBody amqBody) throws Exception
+ {
+ return sendPerformative(getChannelId(), amqBody);
+ }
+
+ public Interaction sendPerformative(int channel, final AMQBody amqBody) throws Exception
+ {
+ final AMQFrame frameBody = new AMQFrame(channel, amqBody);
+ sendPerformativeAndChainFuture(frameBody, false);
+ return this;
+ }
+
+ public Interaction sendPerformative(final AMQDataBlock dataBlock) throws Exception
+ {
+ sendPerformativeAndChainFuture(dataBlock, false);
+ return this;
+ }
+
+ public Interaction openAnonymousConnection() throws Exception
+ {
+ return this.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS").startOk().consumeResponse(ConnectionTuneBody.class)
+ .connection().tuneOk()
+ .connection().open().consumeResponse(ConnectionOpenOkBody.class);
+
+ }
+
+ public ConnectionInteraction connection()
+ {
+ return new ConnectionInteraction(this);
+ }
+
+ public ChannelInteraction channel()
+ {
+ return new ChannelInteraction(this);
+ }
+
+ public QueueInteraction queue()
+ {
+ return new QueueInteraction(this);
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public int getMaximumFrameSize()
+ {
+ return _maximumPayloadSize;
+ }
+
+ public BasicInteraction basic()
+ {
+ return new BasicInteraction(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java
new file mode 100644
index 0000000..66871e9
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/PerformativeResponse.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v0_8;
+
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
+import org.apache.qpid.tests.protocol.Response;
+
+public class PerformativeResponse implements Response<AMQBody>
+{
+ private final int _channel;
+ private final long _size;
+ private final AMQBody _body;
+
+ public PerformativeResponse(int channel, long size, final AMQBody body)
+ {
+ _channel = channel;
+ _size = size;
+ _body = body;
+ }
+
+ @Override
+ public AMQBody getBody()
+ {
+ return _body;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PerformativeResponse{" +
+ "_channel=" + _channel +
+ ", _size=" + _size +
+ ", _body=" + _body +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
new file mode 100644
index 0000000..6e86385
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody;
+
+public class QueueInteraction
+{
+ private Interaction _interaction;
+ private String _declareName;
+ private boolean _declarePassive;
+ private boolean _declareDurable;
+ private boolean _declareExclusive;
+ private boolean _declareAutoDelete;
+ private boolean _declareNowait;
+ private Map<String, Object> _declareArguments = new HashMap<>();
+
+ public QueueInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ }
+
+ public QueueInteraction declareName(String name)
+ {
+ _declareName = name;
+ return this;
+ }
+
+ public Interaction declare() throws Exception
+ {
+ return _interaction.sendPerformative(new QueueDeclareBody(0,
+ AMQShortString.valueOf(_declareName),
+ _declarePassive,
+ _declareDurable,
+ _declareExclusive,
+ _declareAutoDelete,
+ _declareNowait,
+ FieldTable.convertToFieldTable(_declareArguments)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json b/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json
new file mode 100644
index 0000000..d3738c9
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/main/resources/config-protocol-tests-0-8.json
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+ "name" : "${broker.name}",
+ "modelVersion" : "7.0",
+ "authenticationproviders" : [ {
+ "name" : "anon",
+ "type" : "Anonymous"
+ }, {
+ "name" : "plain",
+ "type" : "Plain",
+ "secureOnlyMechanisms" : [],
+ "users" : [ {
+ "name" : "admin",
+ "type" : "managed",
+ "password" : "admin"
+ }, {
+ "name" : "guest",
+ "type" : "managed",
+ "password" : "guest"
+ } ]
+ } ],
+ "ports" : [ {
+ "name" : "AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "plain",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias"
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias"
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias"
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_8", "AMQP_0_9", "AMQP_0_9_1" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ } ],
+ "virtualhostnodes" : []
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
new file mode 100644
index 0000000..eb51925
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class BasicTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ @SpecificationTest(section = "1.8.3.7", description = "publish a message")
+ public void publishMessage() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().contentHeaderPropertiesContentType("text/plain")
+ .contentHeaderPropertiesHeaders(Collections.singletonMap("test", "testValue"))
+ .contentHeaderPropertiesDeliveryMode((byte)1)
+ .contentHeaderPropertiesPriority((byte)1)
+ .publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .content("Test")
+ .publishMessage()
+ .channel().close()
+ .consumeResponse(ChannelCloseOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "1.8.3.3", description = " start a queue consumer")
+ public void consumeMessage() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String messageContent = "Test";
+ String consumerTag = "A";
+ String queueName = BrokerAdmin.TEST_QUEUE_NAME;
+ Map<String, Object> messageHeaders = Collections.singletonMap("test", "testValue");
+ String messageContentType = "text/plain";
+ byte deliveryMode = (byte) 1;
+ byte priority = (byte) 2;
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().qosPrefetchCount(1).qos().consumeResponse(BasicQosOkBody.class)
+ .basic().consumeConsumerTag(consumerTag)
+ .consumeQueue(queueName)
+ .consume().consumeResponse(BasicConsumeOkBody.class)
+ .channel().flow(true).consumeResponse(ChannelFlowOkBody.class)
+ .basic().contentHeaderPropertiesContentType(messageContentType)
+ .contentHeaderPropertiesHeaders(messageHeaders)
+ .contentHeaderPropertiesDeliveryMode(deliveryMode)
+ .contentHeaderPropertiesPriority(priority)
+ .publishExchange("")
+ .publishRoutingKey(queueName)
+ .content(messageContent)
+ .publishMessage()
+ .consumeResponse(BasicDeliverBody.class);
+
+ BasicDeliverBody delivery = interaction.getLatestResponse(BasicDeliverBody.class);
+ assertThat(delivery.getConsumerTag(), is(equalTo(AMQShortString.valueOf(consumerTag))));
+ assertThat(delivery.getConsumerTag(), is(notNullValue()));
+ assertThat(delivery.getRedelivered(), is(equalTo(false)));
+ assertThat(delivery.getExchange(), is(nullValue()));
+ assertThat(delivery.getRoutingKey(), is(equalTo(AMQShortString.valueOf(queueName))));
+
+ ContentHeaderBody header =
+ interaction.consumeResponse(ContentHeaderBody.class).getLatestResponse(ContentHeaderBody.class);
+
+ assertThat(header.getBodySize(), is(equalTo((long)messageContent.length())));
+ BasicContentHeaderProperties properties = header.getProperties();
+ Map<String, Object> receivedHeaders = new HashMap<>(FieldTable.convertToMap(properties.getHeaders()));
+ assertThat(receivedHeaders, is(equalTo(new HashMap<>(messageHeaders))));
+ assertThat(properties.getContentTypeAsString(), is(equalTo(messageContentType)));
+ assertThat(properties.getPriority(), is(equalTo(priority)));
+ assertThat(properties.getDeliveryMode(), is(equalTo(deliveryMode)));
+
+ ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
+
+ QpidByteBuffer payload = content.getPayload();
+ byte[] contentData = new byte[payload.remaining()];
+ payload.get(contentData);
+ payload.dispose();
+ String receivedContent = new String(contentData, StandardCharsets.UTF_8);
+
+ assertThat(receivedContent, is(equalTo(messageContent)));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1)));
+
+ interaction.basic().ackDeliveryTag(delivery.getDeliveryTag())
+ .ack()
+ .channel().close().consumeResponse(ChannelCloseOkBody.class);
+ assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(0)));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
new file mode 100644
index 0000000..4ca4ae8
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ChannelTest.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ChannelTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
+ public void channelOpen() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
new file mode 100644
index 0000000..1fbbf2a
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ConnectionTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
+ public void connectionStart() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionStartBody response =
+ interaction.negotiateProtocol().consumeResponse().getLatestResponse(ConnectionStartBody.class);
+
+ assertThat(response.getVersionMajor(), is(equalTo((short)transport.getProtocolVersion().getMajorVersion())));
+ assertThat(response.getVersionMinor(), is(equalTo((short)transport.getProtocolVersion().getActualMinorVersion())));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.2", description = "select security mechanism and locale")
+ public void connectionStartOk() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse();
+
+ interaction.getLatestResponse(ConnectionTuneBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.5", description = "select security mechanism and locale")
+ public void connectionTuneOkAndOpen() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTuneBody response = interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkFrameMax(response.getFrameMax())
+ .tuneOkHeartbeat(response.getHeartbeat())
+ .tuneOk()
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionOpenOkBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.5", description = "[...] the minimum negotiated value for frame-max is also"
+ + " frame-min-size [4096].")
+ public void tooSmallFrameSize() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTuneBody response = interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkFrameMax(1024)
+ .tuneOkHeartbeat(response.getHeartbeat())
+ .tuneOk()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.5.2.", description = "If the client specifies a frame max that is higher than"
+ + " the value provided by the server, the server MUST"
+ + " close the connection without attempting a negotiated"
+ + " close. The server may report the error in some fashion"
+ + " to assist implementors.")
+ public void tooLargeFrameSize() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTuneBody response = interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkFrameMax(Long.MAX_VALUE)
+ .tuneOkHeartbeat(response.getHeartbeat())
+ .tuneOk()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK"
+ + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
+ public void authenticationBypassBySendingTuneOk() throws Exception
+ {
+ final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+ .connection().tuneOk()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "1.4.", description = "open connection = C:protocolheader S:START C:START OK"
+ + " *challenge S:TUNE C:TUNE OK C:OPEN S:OPEN OK")
+ public void authenticationBypassBySendingOpen() throws Exception
+ {
+ final InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse(ConnectionStartBody.class)
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c6d80d80/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
new file mode 100644
index 0000000..920ea73
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class QueueTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "1.4.2.1", description = "start connection negotiation")
+ public void queueDeclare() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final String queueName = "testQueue";
+ QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareName(queueName).declare()
+ .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+ assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(queueName))));
+ assertThat(response.getMessageCount(), is(equalTo(0L)));
+ assertThat(response.getConsumerCount(), is(equalTo(0L)));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests]
Introduce new module 'protocol-tests-core' and move test common functionality
into it
Posted by or...@apache.org.
QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/06e53d7a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/06e53d7a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/06e53d7a
Branch: refs/heads/master
Commit: 06e53d7a5f6cb942a2706dffe340018b659aa077
Parents: a3c00bb
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:31:29 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Sat Nov 18 01:38:53 2017 +0000
----------------------------------------------------------------------
pom.xml | 13 +
systests/protocol-tests-amqp-1-0/pom.xml | 35 +--
.../qpid/tests/protocol/v1_0/FrameDecoder.java | 276 +++++++++++++++++
.../qpid/tests/protocol/v1_0/FrameEncoder.java | 93 ++++++
.../tests/protocol/v1_0/FrameTransport.java | 205 +------------
.../tests/protocol/v1_0/HeaderResponse.java | 46 ---
.../qpid/tests/protocol/v1_0/InputHandler.java | 305 -------------------
.../qpid/tests/protocol/v1_0/Interaction.java | 141 +++------
.../qpid/tests/protocol/v1_0/Matchers.java | 60 ----
.../qpid/tests/protocol/v1_0/OutputHandler.java | 96 ------
.../protocol/v1_0/PerformativeResponse.java | 1 +
.../qpid/tests/protocol/v1_0/Response.java | 25 --
.../protocol/v1_0/SaslPerformativeResponse.java | 1 +
.../tests/protocol/v1_0/SpecificationTest.java | 34 ---
.../tests/protocol/v1_0/DecodeErrorTest.java | 2 +
.../bindmapjms/TemporaryDestinationTest.java | 4 +-
.../extensions/websocket/WebSocketTest.java | 13 +-
.../v1_0/messaging/DeleteOnCloseTest.java | 2 +-
.../protocol/v1_0/messaging/MessageFormat.java | 4 +-
.../v1_0/messaging/MultiTransferTest.java | 4 +-
.../protocol/v1_0/messaging/OutcomeTest.java | 4 +-
.../protocol/v1_0/messaging/TransferTest.java | 11 +-
.../v1_0/transaction/DischargeTest.java | 2 +-
.../transaction/TransactionalTransferTest.java | 4 +-
.../v1_0/transport/ProtocolHeaderTest.java | 2 +-
.../v1_0/transport/connection/OpenTest.java | 7 +-
.../v1_0/transport/link/AttachTest.java | 2 +-
.../protocol/v1_0/transport/link/FlowTest.java | 2 +-
.../transport/link/ResumeDeliveriesTest.java | 12 +-
.../v1_0/transport/security/sasl/SaslTest.java | 2 +-
.../v1_0/transport/session/BeginTest.java | 8 +-
systests/protocol-tests-core/pom.xml | 75 +++++
.../qpid/tests/protocol/FrameTransport.java | 198 ++++++++++++
.../qpid/tests/protocol/HeaderResponse.java | 46 +++
.../qpid/tests/protocol/InputDecoder.java | 30 ++
.../qpid/tests/protocol/InputHandler.java | 81 +++++
.../apache/qpid/tests/protocol/Interaction.java | 141 +++++++++
.../apache/qpid/tests/protocol/Matchers.java | 60 ++++
.../qpid/tests/protocol/OutputEncoder.java | 29 ++
.../qpid/tests/protocol/OutputHandler.java | 69 +++++
.../apache/qpid/tests/protocol/Response.java | 25 ++
.../qpid/tests/protocol/SpecificationTest.java | 34 +++
42 files changed, 1280 insertions(+), 924 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c6bbfcb..62e7033 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
<module>systests</module>
<module>systests/systests-utils</module>
<module>systests/qpid-systests-jms_2.0</module>
+ <module>systests/protocol-tests-core</module>
<module>systests/protocol-tests-amqp-1-0</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
@@ -407,6 +408,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-amqp-1-0</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index 5200e2d..aa25351 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -53,6 +53,11 @@
<dependency>
<groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-systests-utils</artifactId>
</dependency>
@@ -91,38 +96,9 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
<scope>test</scope>
- <optional>true</optional>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
</dependency>
@@ -130,6 +106,7 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-integration</artifactId>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
new file mode 100644
index 0000000..4dc06cb
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(FrameDecoder.class);
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+ private final MyConnectionHandler _connectionHandler;
+ private volatile FrameHandler _frameHandler;
+
+ private enum ParsingState
+ {
+ HEADER,
+ PERFORMATIVES;
+ }
+
+ private final ValueHandler _valueHandler;
+
+ private volatile ParsingState _state = ParsingState.HEADER;
+
+ public FrameDecoder(final boolean isSasl)
+ {
+ _valueHandler = new ValueHandler(TYPE_REGISTRY);
+ _connectionHandler = new MyConnectionHandler();
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
+ }
+
+ @Override
+ public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
+ {
+ List<Response<?>> responses = new ArrayList<>();
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
+ switch(_state)
+ {
+ case HEADER:
+ if (inputBuffer.remaining() >= 8)
+ {
+ byte[] header = new byte[8];
+ inputBuffer.get(header);
+ responses.add(new HeaderResponse(header));
+ _state = ParsingState.PERFORMATIVES;
+ _frameHandler.parse(qpidByteBuffer);
+ }
+ break;
+ case PERFORMATIVES:
+ _frameHandler.parse(qpidByteBuffer);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state : " + _state);
+ }
+
+ Response<?> r;
+ while((r = _connectionHandler._responseQueue.poll())!=null)
+ {
+ responses.add(r);
+ }
+ return responses;
+ }
+
+ private void resetInputHandlerAfterSaslOutcome()
+ {
+ _state = ParsingState.HEADER;
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+ }
+
+ private class MyConnectionHandler implements ConnectionHandler
+ {
+ private volatile int _frameSize = 512;
+ private Queue<Response<?>> _responseQueue = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void receiveOpen(final int channel, final Open close)
+ {
+ }
+
+ @Override
+ public void receiveClose(final int channel, final Close close)
+ {
+
+ }
+
+ @Override
+ public void receiveBegin(final int channel, final Begin begin)
+ {
+
+ }
+
+ @Override
+ public void receiveEnd(final int channel, final End end)
+ {
+
+ }
+
+ @Override
+ public void receiveAttach(final int channel, final Attach attach)
+ {
+
+ }
+
+ @Override
+ public void receiveDetach(final int channel, final Detach detach)
+ {
+
+ }
+
+ @Override
+ public void receiveTransfer(final int channel, final Transfer transfer)
+ {
+
+ }
+
+ @Override
+ public void receiveDisposition(final int channel, final Disposition disposition)
+ {
+
+ }
+
+ @Override
+ public void receiveFlow(final int channel, final Flow flow)
+ {
+
+ }
+
+ @Override
+ public int getMaxFrameSize()
+ {
+ return _frameSize;
+ }
+
+ @Override
+ public int getChannelMax()
+ {
+ return UnsignedShort.MAX_VALUE.intValue();
+ }
+
+ @Override
+ public void handleError(final Error parsingError)
+ {
+ LOGGER.error("Unexpected error {}", parsingError);
+ }
+
+ @Override
+ public boolean closedForInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void receive(final List<ChannelFrameBody> channelFrameBodies)
+ {
+ for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
+ {
+ Response response;
+ Object val = channelFrameBody.getFrameBody();
+ int channel = channelFrameBody.getChannel();
+ if (val instanceof FrameBody)
+ {
+ FrameBody frameBody = (FrameBody) val;
+ if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
+ {
+ _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
+ }
+ response = new PerformativeResponse((short) channel, frameBody);
+ }
+ else if (val instanceof SaslFrameBody)
+ {
+ SaslFrameBody frameBody = (SaslFrameBody) val;
+ response = new SaslPerformativeResponse((short) channel, frameBody);
+
+ if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+ {
+ resetInputHandlerAfterSaslOutcome();
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
+ }
+
+ _responseQueue.add(response);
+ }
+ }
+
+ @Override
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
new file mode 100644
index 0000000..56d6e6f
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+
+ @Override
+ public ByteBuffer encode(final Object msg)
+ {
+ if (msg instanceof AMQFrame)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>();
+ FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ byte[] data = new byte[msg.remaining()];
+ msg.get(data);
+ buffers.add(ByteBuffer.wrap(data));
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
+ _frameWriter.send(((AMQFrame) msg));
+
+ int remaining = 0;
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ remaining += byteBuffer.remaining();
+ }
+ ByteBuffer result = ByteBuffer.allocate(remaining);
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ result.put(byteBuffer);
+ }
+ result.flip();
+ return result;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4d53751..dd59757 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -19,58 +19,12 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class FrameTransport implements AutoCloseable
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
{
- public static final long RESPONSE_TIMEOUT = Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout",6000);
- private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
- private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-
- private final EventLoopGroup _workerGroup;
- private final InetSocketAddress _brokerAddress;
- private final boolean _isSasl;
-
- private Channel _channel;
- private volatile boolean _channelClosedSeen = false;
-
public FrameTransport(final InetSocketAddress brokerAddress)
{
this(brokerAddress, false);
@@ -78,165 +32,20 @@ public class FrameTransport implements AutoCloseable
public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
{
- _brokerAddress = brokerAddress;
- _isSasl = isSasl;
- _workerGroup = new NioEventLoopGroup();
- }
-
- public InetSocketAddress getBrokerAddress()
- {
- return _brokerAddress;
+ super(brokerAddress, new FrameDecoder(isSasl), new FrameEncoder());
}
+ @Override
public FrameTransport connect()
{
- try
- {
- Bootstrap b = new Bootstrap();
- b.group(_workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>()
- {
- @Override
- public void initChannel(SocketChannel ch) throws Exception
- {
- ChannelPipeline pipeline = ch.pipeline();
- buildInputOutputPipeline(pipeline);
- }
- });
-
- _channel = b.connect(_brokerAddress).sync().channel();
- _channel.closeFuture().addListener(future ->
- {
- _channelClosedSeen = true;
- _queue.add(CHANNEL_CLOSED_RESPONSE);
- });
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ super.connect();
return this;
}
- protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
- {
- pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
- }
-
@Override
- public void close() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.disconnect().sync();
- _channel.close().sync();
- _channel = null;
- }
- }
- finally
- {
- _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
- }
- }
-
- public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
- {
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(bytes);
- _channel.write(buffer, promise);
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
-
- public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
+ public byte[] getProtocolHeader()
{
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- final TransportFrame transportFrame;
- try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
- {
- final QpidByteBuffer duplicate;
- if (payload == null)
- {
- duplicate = null;
- }
- else
- {
- duplicate = payload.duplicate();
- }
- transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
- _channel.write(transportFrame, promise);
- _channel.flush();
- final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
- if (frameBody instanceof Transfer)
- {
- listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
- }
- if (duplicate != null)
- {
- listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
- }
- return listenableFuture;
- }
- }
-
- public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
- {
- SASLFrame transportFrame = new SASLFrame(frameBody);
- ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
- channelFuture.sync();
- return JdkFutureAdapters.listenInPoolThread(channelFuture);
- }
-
- public <T extends Response<?>> T getNextResponse() throws Exception
- {
- return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- }
-
- public void doCloseConnection() throws Exception
- {
- Close close = new Close();
-
- sendPerformative(close, UnsignedShort.valueOf((short) 0));
- PerformativeResponse response = getNextResponse();
- if (!(response.getBody() instanceof Close))
- {
- throw new IllegalStateException(String.format(
- "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
- }
- }
-
- public void assertNoMoreResponses() throws Exception
- {
- Response response = getNextResponse();
- assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
- }
-
- public void assertNoMoreResponsesAndChannelClosed() throws Exception
- {
- assertNoMoreResponses();
- assertThat(_channelClosedSeen, is(true));
- }
-
- private static class ChannelClosedResponse implements Response<Void>
- {
- @Override
- public String toString()
- {
- return "ChannelClosed";
- }
-
- @Override
- public Void getBody()
- {
- return null;
- }
+ return "AMQP\0\1\0\0".getBytes(UTF_8);
}
public Interaction newInteraction()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
deleted file mode 100644
index 9503113..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-public class HeaderResponse implements Response<byte[]>
-{
- private final byte[] _header;
-
- public HeaderResponse(final byte[] header)
- {
- _header = header;
- }
-
- @Override
- public byte[] getBody()
- {
- return _header;
- }
-
- @Override
- public String toString()
- {
- return "HeaderResponse{" +
- "_header=" + Arrays.toString(_header) +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
deleted file mode 100644
index e3acd24..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.ReferenceCountUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class InputHandler extends ChannelInboundHandlerAdapter
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
- private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
-
- private enum ParsingState
- {
- HEADER,
- PERFORMATIVES
- }
-
- private final MyConnectionHandler _connectionHandler;
- private final ValueHandler _valueHandler;
- private final BlockingQueue<Response<?>> _responseQueue;
-
- private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
- private volatile FrameHandler _frameHandler;
- private volatile ParsingState _state = ParsingState.HEADER;
-
- public InputHandler(final BlockingQueue<Response<?>> queue, final boolean isSasl)
- {
-
- _valueHandler = new ValueHandler(TYPE_REGISTRY);
- _connectionHandler = new MyConnectionHandler();
- _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
-
- _responseQueue = queue;
- }
-
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
- {
- ByteBuf buf = (ByteBuf) msg;
- QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
- qpidBuf.put(buf.nioBuffer());
- qpidBuf.flip();
- LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
-
- if (_inputBuffer.hasRemaining())
- {
- QpidByteBuffer old = _inputBuffer;
- _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
- _inputBuffer.put(old);
- _inputBuffer.put(qpidBuf);
- old.dispose();
- qpidBuf.dispose();
- _inputBuffer.flip();
- }
- else
- {
- _inputBuffer.dispose();
- _inputBuffer = qpidBuf;
- }
-
- doParsing();
-
- LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
-
- if (_inputBuffer.hasRemaining())
- {
- _inputBuffer.compact();
- _inputBuffer.flip();
- }
-
- ReferenceCountUtil.release(msg);
- }
-
- private void doParsing()
- {
- switch(_state)
- {
- case HEADER:
- if (_inputBuffer.remaining() >= 8)
- {
- byte[] header = new byte[8];
- _inputBuffer.get(header);
- _responseQueue.add(new HeaderResponse(header));
- _state = ParsingState.PERFORMATIVES;
- doParsing();
- }
- break;
- case PERFORMATIVES:
- _frameHandler.parse(_inputBuffer);
- break;
- default:
- throw new IllegalStateException("Unexpected state : " + _state);
- }
- }
-
- private void resetInputHandlerAfterSaslOutcome()
- {
- _state = ParsingState.HEADER;
- _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
- }
-
- private class MyConnectionHandler implements ConnectionHandler
- {
- private volatile int _frameSize = 512;
-
- @Override
- public void receiveOpen(final int channel, final Open close)
- {
- }
-
- @Override
- public void receiveClose(final int channel, final Close close)
- {
-
- }
-
- @Override
- public void receiveBegin(final int channel, final Begin begin)
- {
-
- }
-
- @Override
- public void receiveEnd(final int channel, final End end)
- {
-
- }
-
- @Override
- public void receiveAttach(final int channel, final Attach attach)
- {
-
- }
-
- @Override
- public void receiveDetach(final int channel, final Detach detach)
- {
-
- }
-
- @Override
- public void receiveTransfer(final int channel, final Transfer transfer)
- {
-
- }
-
- @Override
- public void receiveDisposition(final int channel, final Disposition disposition)
- {
-
- }
-
- @Override
- public void receiveFlow(final int channel, final Flow flow)
- {
-
- }
-
- @Override
- public int getMaxFrameSize()
- {
- return _frameSize;
- }
-
- @Override
- public int getChannelMax()
- {
- return UnsignedShort.MAX_VALUE.intValue();
- }
-
- @Override
- public void handleError(final Error parsingError)
- {
- LOGGER.error("Unexpected error {}", parsingError);
- }
-
- @Override
- public boolean closedForInput()
- {
- return false;
- }
-
- @Override
- public void receive(final List<ChannelFrameBody> channelFrameBodies)
- {
- for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
- {
- Response response;
- Object val = channelFrameBody.getFrameBody();
- int channel = channelFrameBody.getChannel();
- if (val instanceof FrameBody)
- {
- FrameBody frameBody = (FrameBody) val;
- if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
- {
- _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
- }
- response = new PerformativeResponse((short) channel, frameBody);
- }
- else if (val instanceof SaslFrameBody)
- {
- SaslFrameBody frameBody = (SaslFrameBody) val;
- response = new SaslPerformativeResponse((short) channel, frameBody);
-
- if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
- {
- resetInputHandlerAfterSaslOutcome();
- }
- }
- else
- {
- throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
- }
-
- _responseQueue.add(response);
- }
- }
-
- @Override
- public void receiveSaslInit(final SaslInit saslInit)
- {
-
- }
-
- @Override
- public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
- {
-
- }
-
- @Override
- public void receiveSaslChallenge(final SaslChallenge saslChallenge)
- {
-
- }
-
- @Override
- public void receiveSaslResponse(final SaslResponse saslResponse)
- {
-
- }
-
- @Override
- public void receiveSaslOutcome(final SaslOutcome saslOutcome)
- {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 52518ab..7d73ce8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,8 +20,6 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static com.google.common.util.concurrent.Futures.allAsList;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,21 +28,19 @@ import static org.hamcrest.Matchers.is;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -81,8 +77,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
-public class Interaction
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
{
private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Begin _begin;
@@ -94,14 +91,11 @@ public class Interaction
private final Flow _flow;
private final Transfer _transfer;
private final Disposition _disposition;
- private final FrameTransport _transport;
private final SaslInit _saslInit;
private final SaslResponse _saslResponse;
private byte[] _protocolHeader;
private UnsignedShort _connectionChannel;
private UnsignedShort _sessionChannel;
- private Response<?> _latestResponse;
- private ListenableFuture<?> _latestFuture;
private int _deliveryIdCounter;
private List<Transfer> _latestDelivery;
private Object _decodedLatestDelivery;
@@ -109,10 +103,10 @@ public class Interaction
Interaction(final FrameTransport frameTransport)
{
+ super(frameTransport);
final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
- _transport = frameTransport;
- _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+ _protocolHeader = frameTransport.getProtocolHeader();
_saslInit = new SaslInit();
_saslResponse = new SaslResponse();
@@ -154,6 +148,19 @@ public class Interaction
_disposition.setFirst(UnsignedInteger.ZERO);
}
+ public void doCloseConnection() throws Exception
+ {
+ Close close = new Close();
+
+ sendPerformative(close, UnsignedShort.valueOf((short) 0));
+ Response<?> response = getNextResponse();
+ if (!(response.getBody() instanceof Close))
+ {
+ throw new IllegalStateException(String.format(
+ "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
+ }
+ }
+
/////////////////////////
// Protocol Negotiation //
/////////////////////////
@@ -164,17 +171,15 @@ public class Interaction
return this;
}
- public Interaction negotiateProtocol() throws Exception
+ @Override
+ protected byte[] getProtocolHeader()
+ {
+ return _protocolHeader;
+ }
+
+ @Override
+ protected Interaction getInteraction()
{
- final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
return this;
}
@@ -977,83 +982,35 @@ public class Interaction
private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
+ SASLFrame transportFrame = new SASLFrame(frameBody);
+ sendPerformativeAndChainFuture(transportFrame, true);
}
private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
- }
-
- public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
- {
- sync();
- _latestResponse = _transport.getNextResponse();
- final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
- if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
- || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ final TransportFrame transportFrame;
+ try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
{
- return this;
- }
- acceptableResponseClasses.remove(null);
- if (_latestResponse != null)
- {
- for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ final QpidByteBuffer duplicate;
+ if (payload == null)
{
- if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- return this;
- }
+ duplicate = null;
+ }
+ else
+ {
+ duplicate = payload.duplicate();
+ }
+ transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+ ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+ if (frameBody instanceof Transfer)
+ {
+ listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+ }
+ if (duplicate != null)
+ {
+ listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
}
}
- throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
- acceptableResponseClasses,
- _latestResponse == null ? null : _latestResponse.getBody()));
- }
-
- public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
- {
- if (_latestFuture != null)
- {
- _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- _latestFuture = null;
- }
- return this;
- }
-
- public Response<?> getLatestResponse() throws Exception
- {
- sync();
- return _latestResponse;
- }
-
- public <T> T getLatestResponse(Class<T> type) throws Exception
- {
- sync();
- if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
- type.getSimpleName(),
- _latestResponse.getBody()));
- }
-
- return (T) _latestResponse.getBody();
}
public Interaction flowHandleFromLinkHandle()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
deleted file mode 100644
index 029dc70..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-
-public class Matchers
-{
- public static Matcher<Response> protocolHeader(byte[] expectedHeader)
- {
- return new BaseMatcher<Response>()
- {
- @Override
- public void describeTo(final Description description)
- {
- description.appendValue(new HeaderResponse(expectedHeader));
- }
-
- @Override
- public boolean matches(final Object o)
- {
- if (o == null)
- {
- return false;
- }
- if (!(o instanceof HeaderResponse))
- {
- return false;
- }
- if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
- {
- return false;
- }
- return true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
deleted file mode 100644
index 68f4322..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class OutputHandler extends ChannelOutboundHandlerAdapter
-{
- private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
-
-
- @Override
- public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
- {
-
- if (msg instanceof AMQFrame)
- {
- FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
- {
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- @Override
- public void send(final QpidByteBuffer msg)
- {
- byte[] data = new byte[msg.remaining()];
- msg.get(data);
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(data);
- try
- {
- OutputHandler.super.write(ctx, buffer, promise);
- }
- catch (Exception e)
- {
- promise.setFailure(e);
- }
- }
-
- @Override
- public void flush()
- {
- }
-
- @Override
- public void close()
- {
-
- }
- });
- _frameWriter.send(((AMQFrame) msg));
- }
- else
- {
- super.write(ctx, msg, promise);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
index 06a64dc..9e03a26 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
@@ -20,6 +20,7 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.tests.protocol.Response;
public class PerformativeResponse implements Response<FrameBody>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
deleted file mode 100644
index a7e341c..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-public interface Response<T>
-{
- T getBody();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
index 08893e0..02ab3c9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.tests.protocol.Response;
public class SaslPerformativeResponse implements Response<SaslFrameBody>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
deleted file mode 100644
index ea3d164..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface SpecificationTest
-{
- String section();
- String description();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 8a150fa..1ca4419 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -54,6 +54,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 817be05..a3270a0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
@@ -108,7 +108,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
interaction.consumeResponse().getLatestResponse(Flow.class);
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index a39b1b3..e85f356 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -39,10 +39,11 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class WebSocketTest extends BrokerAdminUsingTestBase
{
@@ -73,7 +74,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
{
- final Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ final Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.getLatestResponse(Open.class);
@@ -84,7 +86,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -97,7 +99,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
{
- final Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ final Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.getLatestResponse(Open.class);
@@ -108,7 +111,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 6ba9058..36203a4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -48,7 +48,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
public class DeleteOnCloseTest extends BrokerAdminUsingTestBase
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 97fe3bb..1cf1ff0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index bcd155f..0a45410 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -52,9 +52,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 1853446..cc377cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -99,7 +99,7 @@ public class OutcomeTest extends BrokerAdminUsingTestBase
assertThat(secondDeliveryPayload, is(equalTo("message2")));
// verify that no unexpected performative is received by closing
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index eb72532..245c624 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -76,8 +76,8 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -439,7 +439,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.disposition();
// verify that no unexpected performative is received by closing
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -688,8 +688,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
assertThat(isSettled.get(), is(true));
// verify no unexpected performative received by closing the connection
- transport.doCloseConnection();
-
+ interaction.doCloseConnection();
}
}
@@ -796,7 +795,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.transfer()
.sync();
- transport.doCloseConnection();
+ interaction.doCloseConnection();
assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 42f6114..bd7c113 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1496d13..fb61974 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -57,9 +57,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 619e5d9..db91db1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class ProtocolHeaderTest extends BrokerAdminUsingTestBase
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index b3f57c1..ab570da 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class OpenTest extends BrokerAdminUsingTestBase
@@ -77,7 +77,8 @@ public class OpenTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
{
- Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.openContainerId("testContainerId")
.open().consumeResponse()
@@ -88,7 +89,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index c559399..774cc00 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -43,7 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index ed72ba3..1a38cf0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests]
Introduce new module 'protocol-tests-core' and move test common functionality
into it
Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 35ee4e7..ad56ea9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -69,9 +69,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -459,7 +459,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
}
}
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -588,7 +588,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
.disposition();
}
- transport.doCloseConnection();
+ interaction.doCloseConnection();
if (getBrokerAdmin().isQueueDepthSupported())
{
@@ -672,7 +672,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
.dispositionRole(Role.RECEIVER)
.disposition();
- transport.doCloseConnection();
+ interaction.doCloseConnection();
if (getBrokerAdmin().isQueueDepthSupported())
{
@@ -771,7 +771,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
}
while (!settled);
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index cf12b05..6fea928 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -49,7 +49,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class SaslTest extends BrokerAdminUsingTestBase
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 352fd19..24cb435 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -36,10 +36,11 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class BeginTest extends BrokerAdminUsingTestBase
{
@@ -74,7 +75,8 @@ public class BeginTest extends BrokerAdminUsingTestBase
try (FrameTransport transport = new FrameTransport(addr).connect())
{
final UnsignedShort channel = UnsignedShort.valueOf(37);
- Begin responseBegin = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ Begin responseBegin = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.sessionChannel(channel)
@@ -85,7 +87,7 @@ public class BeginTest extends BrokerAdminUsingTestBase
assertThat(responseBegin.getOutgoingWindow(), is(instanceOf(UnsignedInteger.class)));
assertThat(responseBegin.getNextOutgoingId(), is(instanceOf(UnsignedInteger.class)));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/pom.xml b/systests/protocol-tests-core/pom.xml
new file mode 100644
index 0000000..fa561ed
--- /dev/null
+++ b/systests/protocol-tests-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ ~
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.1.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-core</artifactId>
+ <name>Apache Qpid Broker-J Protocol Tests Core</name>
+ <description>Core classes for Apache Qpid protocol tests</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
new file mode 100644
index 0000000..daf500d
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class FrameTransport implements AutoCloseable
+{
+ public static final long RESPONSE_TIMEOUT =
+ Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+ private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+ private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+ private final EventLoopGroup _workerGroup;
+ private final InetSocketAddress _brokerAddress;
+ private final InputHandler _inputHandler;
+ private final OutputHandler _outputHandler;
+
+ private volatile Channel _channel;
+ private volatile boolean _channelClosedSeen = false;
+
+ public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+ {
+ _brokerAddress = brokerAddress;
+ _inputHandler = new InputHandler(_queue, inputDecoder);
+ _outputHandler = new OutputHandler(outputEncoder);
+ _workerGroup = new NioEventLoopGroup();
+ }
+
+ public InetSocketAddress getBrokerAddress()
+ {
+ return _brokerAddress;
+ }
+
+ public FrameTransport connect()
+ {
+ try
+ {
+ Bootstrap b = new Bootstrap();
+ b.group(_workerGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.handler(new ChannelInitializer<SocketChannel>()
+ {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception
+ {
+ ChannelPipeline pipeline = ch.pipeline();
+ buildInputOutputPipeline(pipeline);
+ }
+ });
+
+ _channel = b.connect(_brokerAddress).sync().channel();
+ _channel.closeFuture().addListener(future ->
+ {
+ _channelClosedSeen = true;
+ _queue.add(CHANNEL_CLOSED_RESPONSE);
+ });
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+ {
+ pipeline.addLast(_inputHandler).addLast(_outputHandler);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.disconnect().sync();
+ _channel.close().sync();
+ _channel = null;
+ }
+ }
+ finally
+ {
+ _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+ }
+ }
+
+ public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(bytes);
+ _channel.write(buffer, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+
+ public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ if (!sync)
+ {
+ ChannelPromise promise = _channel.newPromise();
+ _channel.write(data, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+ else
+ {
+ ChannelFuture channelFuture = _channel.writeAndFlush(data);
+ channelFuture.sync();
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ public <T extends Response<?>> T getNextResponse() throws Exception
+ {
+ return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public void assertNoMoreResponses() throws Exception
+ {
+ Response response = getNextResponse();
+ assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+ }
+
+ public void assertNoMoreResponsesAndChannelClosed() throws Exception
+ {
+ assertNoMoreResponses();
+ assertThat(_channelClosedSeen, is(true));
+ }
+
+ private static class ChannelClosedResponse implements Response<Void>
+ {
+ @Override
+ public String toString()
+ {
+ return "ChannelClosed";
+ }
+
+ @Override
+ public Void getBody()
+ {
+ return null;
+ }
+ }
+
+ public abstract byte[] getProtocolHeader();
+
+ protected abstract Interaction newInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
new file mode 100644
index 0000000..9767b40
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+public class HeaderResponse implements Response<byte[]>
+{
+ private final byte[] _header;
+
+ public HeaderResponse(final byte[] header)
+ {
+ _header = header;
+ }
+
+ @Override
+ public byte[] getBody()
+ {
+ return _header;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HeaderResponse{" +
+ "_header=" + Arrays.toString(_header) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
new file mode 100644
index 0000000..369cfd1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public interface InputDecoder
+{
+ Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
new file mode 100644
index 0000000..2d5fb45
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+
+ private final BlockingQueue<Response<?>> _responseQueue;
+ private final InputDecoder _inputDecoder;
+
+ private ByteBuffer _inputBuffer = ByteBuffer.allocate(0);
+
+ InputHandler(final BlockingQueue<Response<?>> queue, InputDecoder inputDecoder)
+ {
+ _responseQueue = queue;
+ _inputDecoder = inputDecoder;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+ {
+ ByteBuf buf = (ByteBuf) msg;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(buf.readableBytes());
+ byteBuffer.put(buf.nioBuffer());
+ byteBuffer.flip();
+ LOGGER.debug("Incoming {} byte(s)", byteBuffer.remaining());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ ByteBuffer old = _inputBuffer;
+ _inputBuffer = ByteBuffer.allocate(_inputBuffer.remaining() + byteBuffer.remaining());
+ _inputBuffer.put(old);
+ _inputBuffer.put(byteBuffer);
+ _inputBuffer.flip();
+ }
+ else
+ {
+ _inputBuffer = byteBuffer;
+ }
+
+ _responseQueue.addAll(_inputDecoder.decode(_inputBuffer));
+
+ LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ _inputBuffer.compact();
+ _inputBuffer.flip();
+ }
+
+ ReferenceCountUtil.release(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
new file mode 100644
index 0000000..238c0a5
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Interaction<I extends Interaction>
+{
+ private final FrameTransport _transport;
+ private ListenableFuture<?> _latestFuture;
+ private Response<?> _latestResponse;
+
+ public Interaction(final FrameTransport frameTransport)
+ {
+ _transport = frameTransport;
+ }
+
+ public I consumeResponse(final Class<?>... responseTypes) throws Exception
+ {
+ sync();
+ _latestResponse = getNextResponse();
+ final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+ if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+ || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ {
+ return getInteraction();
+ }
+ acceptableResponseClasses.remove(null);
+ if (_latestResponse != null)
+ {
+ for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ {
+ if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ return getInteraction();
+ }
+ }
+ }
+ throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+ acceptableResponseClasses,
+ _latestResponse == null ? null : _latestResponse.getBody()));
+ }
+
+ protected Response<?> getNextResponse() throws Exception
+ {
+ return _transport.getNextResponse();
+ }
+
+ public I sync() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ if (_latestFuture != null)
+ {
+ _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ _latestFuture = null;
+ }
+ return getInteraction();
+ }
+
+ public Response<?> getLatestResponse() throws Exception
+ {
+ sync();
+ return _latestResponse;
+ }
+
+ public <T> T getLatestResponse(Class<T> type) throws Exception
+ {
+ sync();
+ if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getBody()));
+ }
+
+ return (T) _latestResponse.getBody();
+ }
+
+ protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return future;
+ }
+
+ public I negotiateProtocol() throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return getInteraction();
+ }
+
+ protected FrameTransport getTransport()
+ {
+ return _transport;
+ }
+
+ protected abstract byte[] getProtocolHeader();
+
+ protected abstract I getInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
new file mode 100644
index 0000000..292ae9a
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+ public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+ {
+ return new BaseMatcher<Response>()
+ {
+ @Override
+ public void describeTo(final Description description)
+ {
+ description.appendValue(new HeaderResponse(expectedHeader));
+ }
+
+ @Override
+ public boolean matches(final Object o)
+ {
+ if (o == null)
+ {
+ return false;
+ }
+ if (!(o instanceof HeaderResponse))
+ {
+ return false;
+ }
+ if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
+ {
+ return false;
+ }
+ return true;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
new file mode 100644
index 0000000..a6a4a47
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+public interface OutputEncoder
+{
+ ByteBuffer encode(Object msg);
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
new file mode 100644
index 0000000..40a2ca7
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+ private final OutputEncoder _outputEncoder;
+
+ OutputHandler(final OutputEncoder outputEncoder)
+ {
+ _outputEncoder = outputEncoder;
+ }
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+ {
+ ByteBuffer byteBuffer = _outputEncoder.encode(msg);
+ if (byteBuffer != null)
+ {
+ send(ctx, byteBuffer, promise);
+ }
+ else
+ {
+ super.write(ctx, msg, promise);
+ }
+ }
+
+ private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+ {
+ byte[] data = new byte[dataByteBuffer.remaining()];
+ dataByteBuffer.get(data);
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(data);
+ try
+ {
+ OutputHandler.super.write(ctx, buffer, promise);
+ }
+ catch (Exception e)
+ {
+ promise.setFailure(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
new file mode 100644
index 0000000..debc06f
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+public interface Response<T>
+{
+ T getBody();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
new file mode 100644
index 0000000..db6d7a1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface SpecificationTest
+{
+ String section();
+ String description();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org