You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/22 15:02:48 UTC
[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 0-10] Add
protocol tests for AMQP 0-10
QPID-8038: [Broker-J][AMQP 0-10] Add protocol tests for AMQP 0-10
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ff2980e2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ff2980e2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ff2980e2
Branch: refs/heads/master
Commit: ff2980e2d6e9520ba204acd41a78e9ee412a2c11
Parents: 612c2cb
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Nov 21 17:16:42 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 22 15:02:23 2017 +0000
----------------------------------------------------------------------
pom.xml | 47 ++--
systests/protocol-tests-amqp-0-10/pom.xml | 109 +++++++
.../qpid/tests/protocol/v0_10/Assembler.java | 264 +++++++++++++++++
.../protocol/v0_10/ConnectionInteraction.java | 83 ++++++
.../qpid/tests/protocol/v0_10/Disassembler.java | 281 +++++++++++++++++++
.../tests/protocol/v0_10/ErrorResponse.java | 40 +++
.../protocol/v0_10/ExecutionInteraction.java | 47 ++++
.../qpid/tests/protocol/v0_10/FrameDecoder.java | 190 +++++++++++++
.../qpid/tests/protocol/v0_10/FrameEncoder.java | 87 ++++++
.../tests/protocol/v0_10/FrameTransport.java | 58 ++++
.../qpid/tests/protocol/v0_10/Interaction.java | 138 +++++++++
.../protocol/v0_10/MessageInteraction.java | 147 ++++++++++
.../protocol/v0_10/PerformativeResponse.java | 48 ++++
.../protocol/v0_10/ProtocolEventReceiver.java | 67 +++++
.../protocol/v0_10/SessionInteraction.java | 89 ++++++
.../tests/protocol/v0_10/TxInteraction.java | 60 ++++
.../resources/config-protocol-tests-0-10.json | 78 +++++
.../tests/protocol/v0_10/ConnectionTest.java | 214 ++++++++++++++
.../qpid/tests/protocol/v0_10/MessageTest.java | 260 +++++++++++++++++
.../qpid/tests/protocol/v0_10/SessionTest.java | 123 ++++++++
.../tests/protocol/v0_10/TransactionTest.java | 92 ++++++
.../protocol/v0_8/ExchangeInteraction.java | 58 ++++
.../tests/protocol/v0_8/FrameTransport.java | 12 +-
.../qpid/tests/protocol/v0_8/Interaction.java | 23 +-
.../qpid/tests/protocol/v0_8/TxInteraction.java | 44 +++
.../qpid/tests/protocol/v0_8/ChannelTest.java | 2 +-
.../tests/protocol/v0_8/ConnectionTest.java | 26 +-
.../tests/protocol/v0_8/TransactionTest.java | 79 ++++++
.../tests/protocol/v1_0/FrameTransport.java | 4 +-
.../qpid/tests/protocol/v1_0/Interaction.java | 9 +-
.../tests/protocol/AbstractFrameTransport.java | 175 ++++++++++++
.../tests/protocol/AbstractInteraction.java | 150 ++++++++++
.../tests/protocol/ChannelClosedResponse.java | 36 +++
.../qpid/tests/protocol/FrameTransport.java | 190 -------------
.../apache/qpid/tests/protocol/Interaction.java | 147 ----------
35 files changed, 3097 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e7d58bb..0c0c445 100644
--- a/pom.xml
+++ b/pom.xml
@@ -198,6 +198,7 @@
<module>systests/qpid-systests-jms_2.0</module>
<module>systests/protocol-tests-core</module>
<module>systests/protocol-tests-amqp-0-8</module>
+ <module>systests/protocol-tests-amqp-0-10</module>
<module>systests/protocol-tests-amqp-1-0</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
@@ -427,6 +428,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-amqp-0-10</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
@@ -1332,26 +1339,26 @@
</properties>
</profile>
- <profile>
- <id>java-json.0-10</id>
- <activation>
- <property>
- <name>profile</name>
- <value>java-json.0-10</value>
- </property>
- </activation>
- <properties>
- <profile>java-json.0-10</profile>
- <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
- <profile.broker.version>v0_10</profile.broker.version>
- <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
- <profile.broker.persistent>true</profile.broker.persistent>
- <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
- <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
- </properties>
- </profile>
-
- <profile>
+ <profile>
+ <id>java-json.0-10</id>
+ <activation>
+ <property>
+ <name>profile</name>
+ <value>java-json.0-10</value>
+ </property>
+ </activation>
+ <properties>
+ <profile>java-json.0-10</profile>
+ <profile.specific.excludes>JavaPersistentExcludes JavaJsonExcludes XAExcludes Java010Excludes</profile.specific.excludes>
+ <profile.broker.version>v0_10</profile.broker.version>
+ <profile.test.amqp_port_protocols>["AMQP_0_8","AMQP_0_9","AMQP_0_9_1","AMQP_0_10"]</profile.test.amqp_port_protocols>
+ <profile.broker.persistent>true</profile.broker.persistent>
+ <profile.virtualhostnode.type>JSON</profile.virtualhostnode.type>
+ <profile.virtualhostnode.context.blueprint>{"type":"BDB","globalAddressDomains":"${dollar.sign}{qpid.globalAddressDomains}"}</profile.virtualhostnode.context.blueprint>
+ </properties>
+ </profile>
+
+ <profile>
<id>cpp</id>
<activation>
<property>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/pom.xml b/systests/protocol-tests-amqp-0-10/pom.xml
new file mode 100644
index 0000000..3acf129
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.1.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-amqp-0-10</artifactId>
+ <name>Apache Qpid Protocol Tests for AMQP 0-10</name>
+ <description>Tests for AMQP 0-10</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-0-10-protocol</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-test-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-utils</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-logging-logback</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-memory-store</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-derby-store</artifactId>
+ <optional>true</optional>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-bdbstore</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <qpid.initialConfigurationLocation>classpath:config-protocol-tests-0-10.json</qpid.initialConfigurationLocation>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
new file mode 100644
index 0000000..aac39b6
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Assembler.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.DeliveryProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageProperties;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.NetworkEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+
+public class Assembler implements NetworkDelegate
+{
+
+ private static final int ARRAY_SIZE = 0xFF;
+ private final Method[] _incompleteMethodArray = new Method[ARRAY_SIZE + 1];
+ private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
+
+ private final ProtocolEventReceiver receiver;
+ private final Map<Integer, List<Frame>> segments;
+ private static final ThreadLocal<BBDecoder> _decoder = ThreadLocal.withInitial(BBDecoder::new);
+
+ Assembler(ProtocolEventReceiver receiver)
+ {
+ this.receiver = receiver;
+ segments = new HashMap<>();
+ }
+
+ private int segmentKey(Frame frame)
+ {
+ return (frame.getTrack() + 1) * frame.getChannel();
+ }
+
+ private List<Frame> getSegment(Frame frame)
+ {
+ return segments.get(segmentKey(frame));
+ }
+
+ private void setSegment(Frame frame, List<Frame> segment)
+ {
+ int key = segmentKey(frame);
+ if (segments.containsKey(key))
+ {
+ error(new ProtocolError(Frame.L2, "segment in progress: %s",
+ frame));
+ }
+ segments.put(segmentKey(frame), segment);
+ }
+
+ private void clearSegment(Frame frame)
+ {
+ segments.remove(segmentKey(frame));
+ }
+
+ private void emit(int channel, ProtocolEvent event)
+ {
+ event.setChannel(channel);
+ receiver.received(event);
+ }
+
+ void received(NetworkEvent event)
+ {
+ event.delegate(this);
+ }
+
+ public void init(ProtocolHeader header)
+ {
+ emit(0, header);
+ }
+
+ public void error(ProtocolError error)
+ {
+ emit(0, error);
+ }
+
+ public void frame(Frame frame)
+ {
+ ByteBuffer segment;
+ if (frame.isFirstFrame() && frame.isLastFrame())
+ {
+ segment = frame.getBody();
+ assemble(frame, segment);
+ }
+ else
+ {
+ List<Frame> frames;
+ if (frame.isFirstFrame())
+ {
+ frames = new ArrayList<>();
+ setSegment(frame, frames);
+ }
+ else
+ {
+ frames = getSegment(frame);
+ }
+
+ frames.add(frame);
+
+ if (frame.isLastFrame())
+ {
+ clearSegment(frame);
+
+ int size = 0;
+ for (Frame f : frames)
+ {
+ size += f.getSize();
+ }
+ segment = allocateByteBuffer(size);
+ for (Frame f : frames)
+ {
+ segment.put(f.getBody());
+ }
+ segment.flip();
+ assemble(frame, segment);
+ }
+ }
+ }
+
+ private ByteBuffer allocateByteBuffer(final int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+
+ private void assemble(Frame frame, ByteBuffer segment)
+ {
+ BBDecoder dec = _decoder.get();
+ dec.init(segment);
+
+ int channel = frame.getChannel();
+ Method command;
+
+ switch (frame.getType())
+ {
+ case CONTROL:
+ int controlType = dec.readUint16();
+ Method control = Method.create(controlType);
+ control.read(dec);
+ emit(channel, control);
+ break;
+ case COMMAND:
+ int commandType = dec.readUint16();
+ // read in the session header, right now we don't use it
+ int hdr = dec.readUint16();
+ command = Method.create(commandType);
+ command.setSync((0x0001 & hdr) != 0);
+ command.read(dec);
+ if (command.hasPayload() && !frame.isLastSegment())
+ {
+ setIncompleteCommand(channel, command);
+ }
+ else
+ {
+ emit(channel, command);
+ }
+ break;
+ case HEADER:
+ command = getIncompleteCommand(channel);
+ List<Struct> structs = null;
+ DeliveryProperties deliveryProps = null;
+ MessageProperties messageProps = null;
+
+ while (dec.hasRemaining())
+ {
+ Struct struct = dec.readStruct32();
+ if (struct instanceof DeliveryProperties && deliveryProps == null)
+ {
+ deliveryProps = (DeliveryProperties) struct;
+ }
+ else if (struct instanceof MessageProperties && messageProps == null)
+ {
+ messageProps = (MessageProperties) struct;
+ }
+ else
+ {
+ if (structs == null)
+ {
+ structs = new ArrayList<>(2);
+ }
+ structs.add(struct);
+ }
+ }
+ command.setHeader(new Header(deliveryProps, messageProps, structs));
+
+ if (frame.isLastSegment())
+ {
+ setIncompleteCommand(channel, null);
+ emit(channel, command);
+ }
+ break;
+ case BODY:
+ command = getIncompleteCommand(channel);
+ command.setBody(QpidByteBuffer.wrap(segment));
+ setIncompleteCommand(channel, null);
+ emit(channel, command);
+ break;
+ default:
+ throw new IllegalStateException("unknown frame type: " + frame.getType());
+ }
+
+ dec.releaseBuffer();
+ }
+
+ private void setIncompleteCommand(int channelId, Method incomplete)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ _incompleteMethodArray[channelId] = incomplete;
+ }
+ else
+ {
+ if (incomplete != null)
+ {
+ _incompleteMethodMap.put(channelId, incomplete);
+ }
+ else
+ {
+ _incompleteMethodMap.remove(channelId);
+ }
+ }
+ }
+
+ private Method getIncompleteCommand(int channelId)
+ {
+ if ((channelId & ARRAY_SIZE) == channelId)
+ {
+ return _incompleteMethodArray[channelId];
+ }
+ else
+ {
+ return _incompleteMethodMap.get(channelId);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
new file mode 100644
index 0000000..d7b54b0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
+
+public class ConnectionInteraction
+{
+ public static final String SASL_MECHANISM_ANONYMOUS = "ANONYMOUS";
+ public static final String SASL_MECHANISM_PLAIN = "PLAIN";
+
+ private final Interaction _interaction;
+ private ConnectionStartOk _startOk;
+ private ConnectionTuneOk _tuneOk;
+ private ConnectionOpen _open;
+
+ public ConnectionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _startOk = new ConnectionStartOk();
+ _tuneOk = new ConnectionTuneOk();
+ _open = new ConnectionOpen();
+ }
+
+ public Interaction startOk() throws Exception
+ {
+ return _interaction.sendPerformative(_startOk);
+ }
+
+ public ConnectionInteraction startOkMechanism(final String mechanism)
+ {
+ _startOk.setMechanism(mechanism);
+ return this;
+ }
+
+ public Interaction tuneOk() throws Exception
+ {
+ return _interaction.sendPerformative(_tuneOk);
+ }
+
+ public Interaction open() throws Exception
+ {
+ return _interaction.sendPerformative(_open);
+ }
+
+ public ConnectionInteraction tuneOkChannelMax(final int channelMax)
+ {
+ _tuneOk.setChannelMax(channelMax);
+ return this;
+ }
+
+ public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize)
+ {
+ _tuneOk.setMaxFrameSize(maxFrameSize);
+ return this;
+ }
+
+ public ConnectionInteraction startOkResponse(final byte[] response)
+ {
+ _startOk.setResponse(response);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
new file mode 100644
index 0000000..e60049e
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Disassembler.java
@@ -0,0 +1,281 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static java.lang.Math.min;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.FIRST_SEG;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.HEADER_SIZE;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_FRAME;
+import static org.apache.qpid.server.protocol.v0_10.transport.Frame.LAST_SEG;
+
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.FrameSizeObserver;
+import org.apache.qpid.server.protocol.v0_10.ProtocolEventSender;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.Header;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolDelegate;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.server.protocol.v0_10.transport.Struct;
+import org.apache.qpid.server.transport.ByteBufferSender;
+
+/**
+ * Disassembler
+ */
+public final class Disassembler implements ProtocolEventSender, ProtocolDelegate<Void>, FrameSizeObserver
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(Disassembler.class);
+ private final ByteBufferSender _sender;
+ private final Object _sendlock = new Object();
+ private volatile int _maxPayload;
+ private final static ThreadLocal<BBEncoder> _encoder = new ThreadLocal<BBEncoder>()
+ {
+ public BBEncoder initialValue()
+ {
+ return new BBEncoder(4 * 1024);
+ }
+ };
+
+ public Disassembler(ByteBufferSender sender, int maxFrame)
+ {
+ _sender = sender;
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ _maxPayload = maxFrame - HEADER_SIZE;
+ }
+
+ public void send(ProtocolEvent event)
+ {
+ event.delegate(null, this);
+ }
+
+ public void flush()
+ {
+ synchronized (_sendlock)
+ {
+ _sender.flush();
+ }
+ }
+
+ public void close()
+ {
+ synchronized (_sendlock)
+ {
+ _sender.close();
+ }
+ }
+
+ public void init(Void v, ProtocolHeader header)
+ {
+ synchronized (_sendlock)
+ {
+ _sender.send(header.toByteBuffer());
+ _sender.flush();
+ }
+ }
+
+ public void control(Void v, Method method)
+ {
+ method(method, SegmentType.CONTROL);
+ }
+
+ public void command(Void v, Method method)
+ {
+ method(method, SegmentType.COMMAND);
+ }
+
+ private void method(Method method, SegmentType type)
+ {
+ BBEncoder enc = _encoder.get();
+ enc.init();
+ enc.writeUint16(method.getEncodedType());
+ if (type == SegmentType.COMMAND)
+ {
+ if (method.isSync())
+ {
+ enc.writeUint16(0x0101);
+ }
+ else
+ {
+ enc.writeUint16(0x0100);
+ }
+ }
+ method.write(enc);
+ int methodLimit = enc.position();
+
+ byte flags = FIRST_SEG;
+
+ boolean payload = method.hasPayload();
+ if (!payload)
+ {
+ flags |= LAST_SEG;
+ }
+
+ int headerLimit = -1;
+ if (payload)
+ {
+ final Header hdr = method.getHeader();
+ if (hdr != null)
+ {
+ if(hdr.getDeliveryProperties() != null)
+ {
+ enc.writeStruct32(hdr.getDeliveryProperties());
+ }
+ if(hdr.getMessageProperties() != null)
+ {
+ enc.writeStruct32(hdr.getMessageProperties());
+ }
+ if(hdr.getNonStandardProperties() != null)
+ {
+ for (Struct st : hdr.getNonStandardProperties())
+ {
+ enc.writeStruct32(st);
+ }
+ }
+ }
+ headerLimit = enc.position();
+ }
+
+ synchronized (_sendlock)
+ {
+ ByteBuffer buf = enc.underlyingBuffer();
+ buf.flip();
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ copy.put(buf.duplicate());
+ copy.flip();
+
+ final ByteBuffer methodBuf = view(copy,0, methodLimit);
+ fragment(flags, type, method, methodBuf);
+ if (payload)
+ {
+ QpidByteBuffer qpidByteBuffer = method.getBody();
+ ByteBuffer body = null;
+ if (qpidByteBuffer != null)
+ {
+ body = ByteBuffer.allocate(qpidByteBuffer.remaining());
+ qpidByteBuffer.copyTo(body);
+ }
+ ByteBuffer headerBuf = view(copy, methodLimit, headerLimit);
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerBuf);
+ if (body != null)
+ {
+ fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
+ }
+ }
+ }
+ }
+
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buffer)
+ {
+ byte typeb = (byte) type.getValue();
+ byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
+
+ int remaining = buffer.remaining();
+ boolean first = true;
+ while (true)
+ {
+ int size = min(_maxPayload, remaining);
+ remaining -= size;
+
+ byte newflags = flags;
+ if (first)
+ {
+ newflags |= FIRST_FRAME;
+ first = false;
+ }
+ if (remaining == 0)
+ {
+ newflags |= LAST_FRAME;
+ }
+
+ frame(newflags, typeb, track, event.getChannel(), size, buffer);
+
+ if (remaining == 0)
+ {
+ break;
+ }
+ }
+ }
+
+ private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buffer)
+ {
+ ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+
+ data.put(0, flags);
+ data.put(1, type);
+ data.putShort(2, (short) (size + HEADER_SIZE));
+ data.put(4, (byte) 0);
+ data.put(5, track);
+ data.putShort(6, (short) channel);
+
+ try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(data))
+ {
+ _sender.send(qpidByteBuffer);
+ }
+
+ if(size > 0)
+ {
+ final ByteBuffer view = view(buffer, 0, size);
+ try (QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(view))
+ {
+ _sender.send(qpidByteBuffer);
+ }
+ buffer.position(buffer.position() + size);
+ }
+ }
+
+ public void error(Void v, ProtocolError error)
+ {
+ throw new IllegalArgumentException(String.valueOf(error));
+ }
+
+ @Override
+ public void setMaxFrameSize(final int maxFrame)
+ {
+ if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
+ {
+ throw new IllegalArgumentException("maxFrame must be > HEADER_SIZE and < 64K: " + maxFrame);
+ }
+ _maxPayload = maxFrame - HEADER_SIZE;
+
+ }
+
+ private static ByteBuffer view(ByteBuffer buffer, int offset, int length)
+ {
+ ByteBuffer view = buffer.slice();
+ view.position(offset);
+ int newLimit = Math.min(view.position() + length, view.capacity());
+ view.limit(newLimit);
+ return view.slice();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
new file mode 100644
index 0000000..fa79489
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ErrorResponse.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ErrorResponse implements Response<ProtocolError>
+{
+ private final ProtocolError _error;
+
+ public ErrorResponse(final ProtocolError protocolError)
+ {
+ _error = protocolError;
+ }
+
+ @Override
+ public ProtocolError getBody()
+ {
+ return _error;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
new file mode 100644
index 0000000..2e6817b
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ExecutionInteraction.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ExecutionSync;
+
+public class ExecutionInteraction
+{
+ private final Interaction _interaction;
+ private final ExecutionSync _sync;
+
+ public ExecutionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _sync = new ExecutionSync();
+ }
+
+ public ExecutionInteraction syncId(final int id)
+ {
+ _sync.setId(id);
+ return this;
+ }
+
+ public Interaction sync() throws Exception
+ {
+ _interaction.sendPerformative(_sync);
+ return _interaction;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
new file mode 100644
index 0000000..fff894b
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameDecoder.java
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.apache.qpid.server.transport.util.Functions.str;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Collection;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Frame;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+
+ private final ProtocolEventReceiver _receiver;
+
+ public enum State
+ {
+ PROTO_HDR,
+ FRAME_HDR,
+ FRAME_BODY,
+ ERROR
+ }
+
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final Assembler _assembler;
+
+ private int _maxFrameSize = 4096;
+ private State _state;
+ private ByteBuffer input = null;
+ private int _needed;
+
+ private byte _flags;
+ private SegmentType _type;
+ private byte _track;
+ private int _channel;
+
+ FrameDecoder(final byte[] headerBytes)
+ {
+ _receiver = new ProtocolEventReceiver(headerBytes);
+ this._assembler = new Assembler(_receiver);
+ this._state = State.PROTO_HDR;
+ _needed = 8;
+
+ }
+
+ @Override
+ public Collection<Response<?>> decode(final ByteBuffer buf) throws Exception
+ {
+ int limit = buf.limit();
+ int remaining = buf.remaining();
+ while (remaining > 0)
+ {
+ if (remaining >= _needed)
+ {
+ int consumed = _needed;
+ int pos = buf.position();
+ if (input == null)
+ {
+ buf.limit(pos + _needed);
+ input = buf;
+ _state = next(pos);
+ buf.limit(limit);
+ buf.position(pos + consumed);
+ }
+ else
+ {
+ buf.limit(pos + _needed);
+ input.put(buf);
+ buf.limit(limit);
+ input.flip();
+ _state = next(0);
+ }
+
+ remaining -= consumed;
+ input = null;
+ }
+ else
+ {
+ if (input == null)
+ {
+ input = ByteBuffer.allocate(_needed);
+ }
+ input.put(buf);
+ _needed -= remaining;
+ remaining = 0;
+ }
+ }
+ return _receiver.getReceivedEvents();
+ }
+
+ private State next(int pos)
+ {
+ input.order(ByteOrder.BIG_ENDIAN);
+
+ switch (_state) {
+ case PROTO_HDR:
+ if (input.get(pos) != 'A' &&
+ input.get(pos + 1) != 'M' &&
+ input.get(pos + 2) != 'Q' &&
+ input.get(pos + 3) != 'P')
+ {
+ error("bad protocol header: %s", str(input));
+ return State.ERROR;
+ }
+
+ byte protoClass = input.get(pos + 4);
+ byte instance = input.get(pos + 5);
+ byte major = input.get(pos + 6);
+ byte minor = input.get(pos + 7);
+ _assembler.received(new ProtocolHeader(protoClass, instance, major, minor));
+ _needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ case FRAME_HDR:
+ _flags = input.get(pos);
+ _type = SegmentType.get(input.get(pos + 1));
+ int size = (0xFFFF & input.getShort(pos + 2));
+ size -= Frame.HEADER_SIZE;
+ _maxFrameSize = 64 * 1024;
+ if (size < 0 || size > (_maxFrameSize - 12))
+ {
+ error("bad frame size: %d", size);
+ return State.ERROR;
+ }
+ byte b = input.get(pos + 5);
+ if ((b & 0xF0) != 0) {
+ error("non-zero reserved bits in upper nibble of " +
+ "frame header byte 5: '%x'", b);
+ return State.ERROR;
+ } else {
+ _track = (byte) (b & 0xF);
+ }
+ _channel = (0xFFFF & input.getShort(pos + 6));
+ if (size == 0)
+ {
+ Frame frame = new Frame(_flags, _type, _track, _channel, EMPTY_BYTE_BUFFER);
+ _assembler.received(frame);
+ _needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ }
+ else
+ {
+ _needed = size;
+ return State.FRAME_BODY;
+ }
+ case FRAME_BODY:
+ Frame frame = new Frame(_flags, _type, _track, _channel, input.slice());
+ _assembler.received(frame);
+ _needed = Frame.HEADER_SIZE;
+ return State.FRAME_HDR;
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private void error(String fmt, Object ... args)
+ {
+ _assembler.received(new ProtocolError(Frame.L1, fmt, args));
+ }
+
+ public void setMaxFrameSize(final int maxFrameSize)
+ {
+ _maxFrameSize = maxFrameSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
new file mode 100644
index 0000000..dfec4f4
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameEncoder.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+ @Override
+ public ByteBuffer encode(final Object msg)
+ {
+ if (msg instanceof ProtocolEvent)
+ {
+ final List<ByteBuffer> buffers = new ArrayList<>();
+ final AtomicInteger totalSize = new AtomicInteger();
+ Disassembler disassembler = new Disassembler(new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ int remaining = msg.remaining();
+ byte[] data = new byte[remaining];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+ msg.get(data);
+ buffers.add(byteBuffer);
+ totalSize.addAndGet(remaining);
+ }
+
+ @Override
+ public void flush()
+ {
+
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ }, 512);
+
+ disassembler.send((ProtocolEvent) msg);
+ ByteBuffer data = ByteBuffer.allocate(totalSize.get());
+ for (ByteBuffer buffer : buffers)
+ {
+ data.put(buffer);
+ }
+ data.flip();
+ return data;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
new file mode 100644
index 0000000..3b7849c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/FrameTransport.java
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.net.InetSocketAddress;
+
+import org.apache.qpid.server.protocol.v0_10.ProtocolEngineCreator_0_10;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+
+
+public class FrameTransport extends AbstractFrameTransport<Interaction>
+{
+ private final byte[] _protocolHeader;
+
+ public FrameTransport(final InetSocketAddress brokerAddress)
+ {
+ super(brokerAddress, new FrameDecoder(new ProtocolEngineCreator_0_10().getHeaderIdentifier()), new FrameEncoder());
+ _protocolHeader = new ProtocolEngineCreator_0_10().getHeaderIdentifier();
+ }
+
+ @Override
+ public byte[] getProtocolHeader()
+ {
+ return _protocolHeader;
+ }
+
+ @Override
+ public Interaction newInteraction()
+ {
+ return new Interaction(this);
+ }
+
+ @Override
+ public FrameTransport connect()
+ {
+ super.connect();
+ return this;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
new file mode 100644
index 0000000..5d53a89
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/Interaction.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.server.protocol.v0_10.transport.BBDecoder;
+import org.apache.qpid.server.protocol.v0_10.transport.BBEncoder;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttached;
+import org.apache.qpid.tests.protocol.AbstractFrameTransport;
+import org.apache.qpid.tests.protocol.AbstractInteraction;
+
+public class Interaction extends AbstractInteraction<Interaction>
+{
+ private ConnectionInteraction _connectionInteraction;
+ private SessionInteraction _sessionInteraction;
+ private MessageInteraction _messageInteraction;
+ private ExecutionInteraction _executionInteraction;
+ private int _channelId;
+ private TxInteraction _txInteraction;
+
+ public Interaction(final AbstractFrameTransport frameTransport)
+ {
+ super(frameTransport);
+ _connectionInteraction = new ConnectionInteraction(this);
+ _sessionInteraction = new SessionInteraction(this);
+ _messageInteraction = new MessageInteraction(this);
+ _executionInteraction = new ExecutionInteraction(this);
+ _txInteraction = new TxInteraction(this);
+ }
+
+ @Override
+ protected byte[] getProtocolHeader()
+ {
+ return getTransport().getProtocolHeader();
+ }
+
+ public <T extends Method> Interaction sendPerformative(final T performative) throws Exception
+ {
+ performative.setChannel(_channelId);
+ sendPerformativeAndChainFuture(copyPerformative(performative));
+ return this;
+ }
+
+ public ConnectionInteraction connection()
+ {
+ return _connectionInteraction;
+ }
+
+ private <T extends Method> T copyPerformative(final T src)
+ {
+ T dst = (T) Method.create(src.getStructType());
+ final BBEncoder encoder = new BBEncoder(4096);
+ encoder.init();
+ src.write(encoder);
+ ByteBuffer buffer = encoder.buffer();
+
+ final BBDecoder decoder = new BBDecoder();
+ decoder.init(buffer);
+ dst.read(decoder);
+ return dst;
+ }
+
+ public Interaction openAnonymousConnection() throws Exception
+ {
+ this.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse(ConnectionTune.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOk.class);
+ return this;
+ }
+
+ public SessionInteraction session()
+ {
+ return _sessionInteraction;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public Interaction channelId(final int channelId)
+ {
+ _channelId = channelId;
+ return this;
+ }
+
+ public Interaction attachSession(final byte[] sessionName) throws Exception
+ {
+ this.session()
+ .attachName(sessionName)
+ .attach()
+ .consumeResponse(SessionAttached.class)
+ .session().commandPointCommandId(0).commandPoint();
+ return this;
+ }
+
+ public MessageInteraction message()
+ {
+ return _messageInteraction;
+ }
+
+ public ExecutionInteraction execution()
+ {
+ return _executionInteraction;
+ }
+
+ public TxInteraction tx()
+ {
+ return _txInteraction;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
new file mode 100644
index 0000000..4660c86
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/MessageInteraction.java
@@ -0,0 +1,147 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAccept;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcceptMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageAcquireMode;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageCreditUnit;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageFlow;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageSubscribe;
+import org.apache.qpid.server.protocol.v0_10.transport.MessageTransfer;
+import org.apache.qpid.server.protocol.v0_10.transport.RangeSet;
+
+public class MessageInteraction
+{
+ private final Interaction _interaction;
+ private MessageTransfer _transfer;
+ private MessageSubscribe _subscribe;
+ private MessageFlow _flow;
+ private MessageAccept _accept;
+
+ public MessageInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _transfer = new MessageTransfer();
+ _subscribe = new MessageSubscribe();
+ _flow = new MessageFlow();
+ _accept = new MessageAccept();
+ }
+
+ public MessageInteraction transferId(final int id)
+ {
+ _transfer.setId(id);
+ return this;
+ }
+
+ public MessageInteraction transferDesitnation(final String destination)
+ {
+ _transfer.setDestination(destination);
+ return this;
+ }
+
+ public Interaction transfer() throws Exception
+ {
+ _interaction.sendPerformative(_transfer);
+ return _interaction;
+ }
+
+ public MessageInteraction subscribeQueue(final String queueName)
+ {
+ _subscribe.setQueue(queueName);
+ return this;
+ }
+
+ public MessageInteraction subscribeId(final int id)
+ {
+ _subscribe.setId(id);
+ return this;
+ }
+
+ public Interaction subscribe() throws Exception
+ {
+ return _interaction.sendPerformative(_subscribe);
+ }
+
+ public MessageInteraction subscribeDestination(final String destination)
+ {
+ _subscribe.setDestination(destination);
+ return this;
+ }
+
+ public Interaction flow() throws Exception
+ {
+ return _interaction.sendPerformative(_flow);
+ }
+
+ public MessageInteraction flowId(final int id)
+ {
+ _flow.setId(id);
+ return this;
+ }
+
+ public MessageInteraction flowDestination(final String destination)
+ {
+ _flow.setDestination(destination);
+ return this;
+ }
+
+ public MessageInteraction flowUnit(final MessageCreditUnit unit)
+ {
+ _flow.setUnit(unit);
+ return this;
+ }
+
+ public MessageInteraction flowValue(final long value)
+ {
+ _flow.setValue(value);
+ return this;
+ }
+
+ public MessageInteraction subscribeAcceptMode(final MessageAcceptMode acceptMode)
+ {
+ _subscribe.setAcceptMode(acceptMode);
+ return this;
+ }
+
+ public MessageInteraction subscribeAcquireMode(final MessageAcquireMode acquireMode)
+ {
+ _subscribe.setAcquireMode(acquireMode);
+ return this;
+ }
+
+ public Interaction accept() throws Exception
+ {
+ return _interaction.sendPerformative(_accept);
+ }
+
+ public MessageInteraction acceptId(final int id)
+ {
+ _accept.setId(id);
+ return this;
+ }
+
+ public MessageInteraction acceptTransfers(final RangeSet transfers)
+ {
+ _accept.setTransfers(transfers);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
new file mode 100644
index 0000000..701e92c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/PerformativeResponse.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.tests.protocol.Response;
+
+public class PerformativeResponse implements Response<Method>
+{
+ private Method _method;
+
+ public PerformativeResponse(final Method method)
+ {
+ _method = method;
+ }
+
+ @Override
+ public Method getBody()
+ {
+ return _method;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PerformativeResponse{" +
+ "_method=" + _method +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
new file mode 100644
index 0000000..37eb66e
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ProtocolEventReceiver.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolEvent;
+import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+
+public class ProtocolEventReceiver
+{
+ private Queue<Response<?>> _events = new ConcurrentLinkedQueue<>();
+ private final byte[] _headerBytes;
+
+ public ProtocolEventReceiver(final byte[] headerBytes)
+ {
+ _headerBytes = headerBytes;
+ }
+
+ void received(ProtocolEvent msg)
+ {
+ if (msg instanceof ProtocolHeader)
+ {
+ _events.add(new HeaderResponse(_headerBytes));
+ }
+ else if (msg instanceof Method)
+ {
+ _events.add(new PerformativeResponse((Method) msg));
+ }
+ else if (msg instanceof ProtocolError)
+ {
+ _events.add(new ErrorResponse((ProtocolError) msg));
+ }
+ }
+
+ public Collection<Response<?>> getReceivedEvents()
+ {
+ Collection<Response<?>> results = new ArrayList<>(_events);
+ _events.removeAll(results);
+ return results;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
new file mode 100644
index 0000000..fce711d
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/SessionInteraction.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.Method;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionAttach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionCommandPoint;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionDetach;
+import org.apache.qpid.server.protocol.v0_10.transport.SessionFlush;
+
+public class SessionInteraction
+{
+ private final Interaction _interaction;
+ private SessionAttach _attach;
+ private SessionDetach _detach;
+ private SessionCommandPoint _commandPoint;
+ private SessionFlush _flush;
+
+ public SessionInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _attach = new SessionAttach();
+ _detach = new SessionDetach();
+ _commandPoint = new SessionCommandPoint();
+ _flush = new SessionFlush();
+ }
+
+ public Interaction attach() throws Exception
+ {
+ return _interaction.sendPerformative(_attach);
+ }
+
+ public SessionInteraction attachName(final byte[] name)
+ {
+ _attach.setName(name);
+ return this;
+ }
+
+ public Interaction detach() throws Exception
+ {
+ return _interaction.sendPerformative(_detach);
+ }
+
+ public SessionInteraction detachName(final byte[] sessionName)
+ {
+ _detach.setName(sessionName);
+ return this;
+ }
+
+ public Interaction commandPoint() throws Exception
+ {
+ return _interaction.sendPerformative(_commandPoint);
+ }
+
+ public SessionInteraction commandPointCommandId(final int commandId)
+ {
+ _commandPoint.setCommandId(commandId);
+ return this;
+ }
+
+ public Interaction flush() throws Exception
+ {
+ return _interaction.sendPerformative(_flush);
+ }
+
+ public SessionInteraction flushCompleted()
+ {
+ _flush.setCompleted(true);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
new file mode 100644
index 0000000..14c9912
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/TxInteraction.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import org.apache.qpid.server.protocol.v0_10.transport.TxCommit;
+import org.apache.qpid.server.protocol.v0_10.transport.TxSelect;
+
+public class TxInteraction
+{
+ private final Interaction _interaction;
+ private final TxSelect _select;
+ private final TxCommit _commit;
+
+ public TxInteraction(final Interaction interaction)
+ {
+ _interaction = interaction;
+ _select = new TxSelect();
+ _commit = new TxCommit();
+ }
+
+ public Interaction select() throws Exception
+ {
+ return _interaction.sendPerformative(_select);
+ }
+
+ public TxInteraction selectId(final int id)
+ {
+ _select.setId(id);
+ return this;
+ }
+
+ public TxInteraction commitId(final int id)
+ {
+ _commit.setId(id);
+ return this;
+ }
+
+ public Interaction commit() throws Exception
+ {
+ return _interaction.sendPerformative(_commit);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
new file mode 100644
index 0000000..69387fb
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/main/resources/config-protocol-tests-0-10.json
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+{
+ "name" : "${broker.name}",
+ "modelVersion" : "7.0",
+ "authenticationproviders" : [ {
+ "name" : "anon",
+ "type" : "Anonymous"
+ }, {
+ "name" : "plain",
+ "type" : "Plain",
+ "secureOnlyMechanisms" : [],
+ "users" : [ {
+ "name" : "admin",
+ "type" : "managed",
+ "password" : "admin"
+ }, {
+ "name" : "guest",
+ "type" : "managed",
+ "password" : "guest"
+ } ]
+ } ],
+ "ports" : [ {
+ "name" : "AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "plain",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_10" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias"
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias"
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias"
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "AMQP_0_10" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ } ],
+ "virtualhostnodes" : []
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ff2980e2/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
new file mode 100644
index 0000000..1072f7c
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -0,0 +1,214 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_10;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+
+import org.hamcrest.core.IsEqual;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTune;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+public class ConnectionTest extends BrokerAdminUsingTestBase
+{
+ private static final String DEFAULT_LOCALE = "en_US";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "4.3. Version Negotiation",
+ description = "When the client opens a new socket connection to an AMQP server,"
+ + " it MUST send a protocol header with the client's preferred protocol version."
+ + "If the requested protocol version is supported, the server MUST send its own protocol"
+ + " header with the requested version to the socket, and then implement the protocol accordingly")
+ public void versionNegotiation() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ Response<?> response = interaction.negotiateProtocol().consumeResponse().getLatestResponse();
+ assertThat(response, is(instanceOf(HeaderResponse.class)));
+ assertThat(response.getBody(), is(IsEqual.equalTo(transport.getProtocolHeader())));
+
+ ConnectionStart connectionStart = interaction.consumeResponse().getLatestResponse(ConnectionStart.class);
+ assertThat(connectionStart.getMechanisms(), is(notNullValue()));
+ assertThat(connectionStart.getMechanisms(), contains(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS));
+ assertThat(connectionStart.getLocales(), is(notNullValue()));
+ assertThat(connectionStart.getLocales(), contains(DEFAULT_LOCALE));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9.connection.start-ok",
+ description = "An AMQP client MUST handle incoming connection.start controls.")
+ public void startOk() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9.connection.tune-ok",
+ description = "This control sends the client's connection tuning parameters to the server."
+ + " Certain fields are negotiated, others provide capability information.")
+ public void tuneOkAndOpen() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse(ConnectionTune.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionOpenOk.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassBySendingTuneOk() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionClose.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassBySendingOpen() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse().consumeResponse(ConnectionStart.class)
+ .connection().open()
+ .consumeResponse().getLatestResponse(ConnectionClose.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9",
+ description = "open-connection = C:protocol-header S:START C:START-OK *challenge S:TUNE C:TUNE-OK C:OPEN S:OPEN-OK")
+ public void authenticationBypassAfterSendingStartOk() throws Exception
+ {
+ InetSocketAddress brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try(FrameTransport transport = new FrameTransport(brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_PLAIN).startOk().consumeResponse(ConnectionSecure.class)
+ .connection().tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "9.connection.tune-ok.minimum",
+ description = "[...] the minimum negotiated value for max-frame-size is also MIN-MAX-FRAME-SIZE [4096]")
+ public void tooSmallFrameSize() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkMaxFrameSize(1024)
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "9.connection.tune-ok.max-frame-size",
+ description = "If the client specifies a channel max that is higher than the value provided by the server,"
+ + " the server MUST close the connection without attempting a negotiated close."
+ + " The server may report the error in some fashion to assist implementers.")
+ public void tooLargeFrameSize() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+ assumeThat(response.hasMaxFrameSize(), is(true));
+ assumeThat(response.getMaxFrameSize(), is(lessThan(0xFFFF)));
+ interaction.connection().tuneOkChannelMax(response.getChannelMax())
+ .tuneOkMaxFrameSize(response.getMaxFrameSize() + 1)
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionClose.class, ChannelClosedResponse.class);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org