You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/10 14:44:41 UTC
[2/6] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests]
Introduce new module 'protocol-tests-core' and move test common functionality
into it
QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it
Cherry picked from 06e53d7 with manual resolutions.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/deab4580
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/deab4580
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/deab4580
Branch: refs/heads/7.0.x
Commit: deab4580b7a876cfe16d35fe40804ac19e277cfc
Parents: 9a76285
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:31:29 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:34:28 2018 +0000
----------------------------------------------------------------------
pom.xml | 13 +
systests/protocol-tests-amqp-1-0/pom.xml | 35 +--
.../qpid/tests/protocol/v1_0/FrameDecoder.java | 276 +++++++++++++++++
.../qpid/tests/protocol/v1_0/FrameEncoder.java | 93 ++++++
.../tests/protocol/v1_0/FrameTransport.java | 205 +------------
.../tests/protocol/v1_0/HeaderResponse.java | 46 ---
.../qpid/tests/protocol/v1_0/InputHandler.java | 305 -------------------
.../qpid/tests/protocol/v1_0/Interaction.java | 141 +++------
.../qpid/tests/protocol/v1_0/Matchers.java | 60 ----
.../qpid/tests/protocol/v1_0/OutputHandler.java | 96 ------
.../protocol/v1_0/PerformativeResponse.java | 1 +
.../qpid/tests/protocol/v1_0/Response.java | 25 --
.../protocol/v1_0/SaslPerformativeResponse.java | 1 +
.../tests/protocol/v1_0/SpecificationTest.java | 34 ---
.../tests/protocol/v1_0/DecodeErrorTest.java | 2 +
.../bindmapjms/TemporaryDestinationTest.java | 4 +-
.../extensions/management/ManagementTest.java | 5 +-
.../extensions/websocket/WebSocketTest.java | 13 +-
.../v1_0/messaging/DeleteOnCloseTest.java | 2 +-
.../protocol/v1_0/messaging/MessageFormat.java | 4 +-
.../v1_0/messaging/MultiTransferTest.java | 4 +-
.../protocol/v1_0/messaging/OutcomeTest.java | 4 +-
.../protocol/v1_0/messaging/TransferTest.java | 11 +-
.../v1_0/transaction/DischargeTest.java | 2 +-
.../transaction/TransactionalTransferTest.java | 4 +-
.../v1_0/transport/ProtocolHeaderTest.java | 2 +-
.../v1_0/transport/connection/OpenTest.java | 7 +-
.../v1_0/transport/link/AttachTest.java | 2 +-
.../protocol/v1_0/transport/link/FlowTest.java | 2 +-
.../transport/link/ResumeDeliveriesTest.java | 12 +-
.../v1_0/transport/security/sasl/SaslTest.java | 2 +-
.../v1_0/transport/session/BeginTest.java | 8 +-
systests/protocol-tests-core/pom.xml | 75 +++++
.../qpid/tests/protocol/FrameTransport.java | 198 ++++++++++++
.../qpid/tests/protocol/HeaderResponse.java | 46 +++
.../qpid/tests/protocol/InputDecoder.java | 30 ++
.../qpid/tests/protocol/InputHandler.java | 81 +++++
.../apache/qpid/tests/protocol/Interaction.java | 141 +++++++++
.../apache/qpid/tests/protocol/Matchers.java | 60 ++++
.../qpid/tests/protocol/OutputEncoder.java | 29 ++
.../qpid/tests/protocol/OutputHandler.java | 69 +++++
.../apache/qpid/tests/protocol/Response.java | 25 ++
.../qpid/tests/protocol/SpecificationTest.java | 34 +++
43 files changed, 1282 insertions(+), 927 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1262002..1287501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
<module>systests</module>
<module>systests/systests-utils</module>
<module>systests/qpid-systests-jms_2.0</module>
+ <module>systests/protocol-tests-core</module>
<module>systests/protocol-tests-amqp-1-0</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
@@ -407,6 +408,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-amqp-1-0</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index 56fc1f7..c392cf7 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -56,6 +56,11 @@
<dependency>
<groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-systests-utils</artifactId>
</dependency>
@@ -94,38 +99,9 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
<scope>test</scope>
- <optional>true</optional>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
</dependency>
@@ -133,6 +109,7 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-integration</artifactId>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
new file mode 100644
index 0000000..4dc06cb
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(FrameDecoder.class);
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+ private final MyConnectionHandler _connectionHandler;
+ private volatile FrameHandler _frameHandler;
+
+ private enum ParsingState
+ {
+ HEADER,
+ PERFORMATIVES;
+ }
+
+ private final ValueHandler _valueHandler;
+
+ private volatile ParsingState _state = ParsingState.HEADER;
+
+ public FrameDecoder(final boolean isSasl)
+ {
+ _valueHandler = new ValueHandler(TYPE_REGISTRY);
+ _connectionHandler = new MyConnectionHandler();
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
+ }
+
+ @Override
+ public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
+ {
+ List<Response<?>> responses = new ArrayList<>();
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
+ switch(_state)
+ {
+ case HEADER:
+ if (inputBuffer.remaining() >= 8)
+ {
+ byte[] header = new byte[8];
+ inputBuffer.get(header);
+ responses.add(new HeaderResponse(header));
+ _state = ParsingState.PERFORMATIVES;
+ _frameHandler.parse(qpidByteBuffer);
+ }
+ break;
+ case PERFORMATIVES:
+ _frameHandler.parse(qpidByteBuffer);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state : " + _state);
+ }
+
+ Response<?> r;
+ while((r = _connectionHandler._responseQueue.poll())!=null)
+ {
+ responses.add(r);
+ }
+ return responses;
+ }
+
+ private void resetInputHandlerAfterSaslOutcome()
+ {
+ _state = ParsingState.HEADER;
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+ }
+
+ private class MyConnectionHandler implements ConnectionHandler
+ {
+ private volatile int _frameSize = 512;
+ private Queue<Response<?>> _responseQueue = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void receiveOpen(final int channel, final Open close)
+ {
+ }
+
+ @Override
+ public void receiveClose(final int channel, final Close close)
+ {
+
+ }
+
+ @Override
+ public void receiveBegin(final int channel, final Begin begin)
+ {
+
+ }
+
+ @Override
+ public void receiveEnd(final int channel, final End end)
+ {
+
+ }
+
+ @Override
+ public void receiveAttach(final int channel, final Attach attach)
+ {
+
+ }
+
+ @Override
+ public void receiveDetach(final int channel, final Detach detach)
+ {
+
+ }
+
+ @Override
+ public void receiveTransfer(final int channel, final Transfer transfer)
+ {
+
+ }
+
+ @Override
+ public void receiveDisposition(final int channel, final Disposition disposition)
+ {
+
+ }
+
+ @Override
+ public void receiveFlow(final int channel, final Flow flow)
+ {
+
+ }
+
+ @Override
+ public int getMaxFrameSize()
+ {
+ return _frameSize;
+ }
+
+ @Override
+ public int getChannelMax()
+ {
+ return UnsignedShort.MAX_VALUE.intValue();
+ }
+
+ @Override
+ public void handleError(final Error parsingError)
+ {
+ LOGGER.error("Unexpected error {}", parsingError);
+ }
+
+ @Override
+ public boolean closedForInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void receive(final List<ChannelFrameBody> channelFrameBodies)
+ {
+ for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
+ {
+ Response response;
+ Object val = channelFrameBody.getFrameBody();
+ int channel = channelFrameBody.getChannel();
+ if (val instanceof FrameBody)
+ {
+ FrameBody frameBody = (FrameBody) val;
+ if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
+ {
+ _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
+ }
+ response = new PerformativeResponse((short) channel, frameBody);
+ }
+ else if (val instanceof SaslFrameBody)
+ {
+ SaslFrameBody frameBody = (SaslFrameBody) val;
+ response = new SaslPerformativeResponse((short) channel, frameBody);
+
+ if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+ {
+ resetInputHandlerAfterSaslOutcome();
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
+ }
+
+ _responseQueue.add(response);
+ }
+ }
+
+ @Override
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
new file mode 100644
index 0000000..56d6e6f
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+
+ @Override
+ public ByteBuffer encode(final Object msg)
+ {
+ if (msg instanceof AMQFrame)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>();
+ FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ byte[] data = new byte[msg.remaining()];
+ msg.get(data);
+ buffers.add(ByteBuffer.wrap(data));
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
+ _frameWriter.send(((AMQFrame) msg));
+
+ int remaining = 0;
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ remaining += byteBuffer.remaining();
+ }
+ ByteBuffer result = ByteBuffer.allocate(remaining);
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ result.put(byteBuffer);
+ }
+ result.flip();
+ return result;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4d53751..dd59757 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -19,58 +19,12 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class FrameTransport implements AutoCloseable
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
{
- public static final long RESPONSE_TIMEOUT = Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout",6000);
- private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
- private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-
- private final EventLoopGroup _workerGroup;
- private final InetSocketAddress _brokerAddress;
- private final boolean _isSasl;
-
- private Channel _channel;
- private volatile boolean _channelClosedSeen = false;
-
public FrameTransport(final InetSocketAddress brokerAddress)
{
this(brokerAddress, false);
@@ -78,165 +32,20 @@ public class FrameTransport implements AutoCloseable
public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
{
- _brokerAddress = brokerAddress;
- _isSasl = isSasl;
- _workerGroup = new NioEventLoopGroup();
- }
-
- public InetSocketAddress getBrokerAddress()
- {
- return _brokerAddress;
+ super(brokerAddress, new FrameDecoder(isSasl), new FrameEncoder());
}
+ @Override
public FrameTransport connect()
{
- try
- {
- Bootstrap b = new Bootstrap();
- b.group(_workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>()
- {
- @Override
- public void initChannel(SocketChannel ch) throws Exception
- {
- ChannelPipeline pipeline = ch.pipeline();
- buildInputOutputPipeline(pipeline);
- }
- });
-
- _channel = b.connect(_brokerAddress).sync().channel();
- _channel.closeFuture().addListener(future ->
- {
- _channelClosedSeen = true;
- _queue.add(CHANNEL_CLOSED_RESPONSE);
- });
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ super.connect();
return this;
}
- protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
- {
- pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
- }
-
@Override
- public void close() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.disconnect().sync();
- _channel.close().sync();
- _channel = null;
- }
- }
- finally
- {
- _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
- }
- }
-
- public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
- {
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(bytes);
- _channel.write(buffer, promise);
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
-
- public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
+ public byte[] getProtocolHeader()
{
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- final TransportFrame transportFrame;
- try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
- {
- final QpidByteBuffer duplicate;
- if (payload == null)
- {
- duplicate = null;
- }
- else
- {
- duplicate = payload.duplicate();
- }
- transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
- _channel.write(transportFrame, promise);
- _channel.flush();
- final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
- if (frameBody instanceof Transfer)
- {
- listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
- }
- if (duplicate != null)
- {
- listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
- }
- return listenableFuture;
- }
- }
-
- public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
- {
- SASLFrame transportFrame = new SASLFrame(frameBody);
- ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
- channelFuture.sync();
- return JdkFutureAdapters.listenInPoolThread(channelFuture);
- }
-
- public <T extends Response<?>> T getNextResponse() throws Exception
- {
- return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- }
-
- public void doCloseConnection() throws Exception
- {
- Close close = new Close();
-
- sendPerformative(close, UnsignedShort.valueOf((short) 0));
- PerformativeResponse response = getNextResponse();
- if (!(response.getBody() instanceof Close))
- {
- throw new IllegalStateException(String.format(
- "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
- }
- }
-
- public void assertNoMoreResponses() throws Exception
- {
- Response response = getNextResponse();
- assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
- }
-
- public void assertNoMoreResponsesAndChannelClosed() throws Exception
- {
- assertNoMoreResponses();
- assertThat(_channelClosedSeen, is(true));
- }
-
- private static class ChannelClosedResponse implements Response<Void>
- {
- @Override
- public String toString()
- {
- return "ChannelClosed";
- }
-
- @Override
- public Void getBody()
- {
- return null;
- }
+ return "AMQP\0\1\0\0".getBytes(UTF_8);
}
public Interaction newInteraction()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
deleted file mode 100644
index 9503113..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-public class HeaderResponse implements Response<byte[]>
-{
- private final byte[] _header;
-
- public HeaderResponse(final byte[] header)
- {
- _header = header;
- }
-
- @Override
- public byte[] getBody()
- {
- return _header;
- }
-
- @Override
- public String toString()
- {
- return "HeaderResponse{" +
- "_header=" + Arrays.toString(_header) +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
deleted file mode 100644
index e3acd24..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.ReferenceCountUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class InputHandler extends ChannelInboundHandlerAdapter
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
- private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
-
- private enum ParsingState
- {
- HEADER,
- PERFORMATIVES
- }
-
- private final MyConnectionHandler _connectionHandler;
- private final ValueHandler _valueHandler;
- private final BlockingQueue<Response<?>> _responseQueue;
-
- private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
- private volatile FrameHandler _frameHandler;
- private volatile ParsingState _state = ParsingState.HEADER;
-
- public InputHandler(final BlockingQueue<Response<?>> queue, final boolean isSasl)
- {
-
- _valueHandler = new ValueHandler(TYPE_REGISTRY);
- _connectionHandler = new MyConnectionHandler();
- _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
-
- _responseQueue = queue;
- }
-
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
- {
- ByteBuf buf = (ByteBuf) msg;
- QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
- qpidBuf.put(buf.nioBuffer());
- qpidBuf.flip();
- LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
-
- if (_inputBuffer.hasRemaining())
- {
- QpidByteBuffer old = _inputBuffer;
- _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
- _inputBuffer.put(old);
- _inputBuffer.put(qpidBuf);
- old.dispose();
- qpidBuf.dispose();
- _inputBuffer.flip();
- }
- else
- {
- _inputBuffer.dispose();
- _inputBuffer = qpidBuf;
- }
-
- doParsing();
-
- LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
-
- if (_inputBuffer.hasRemaining())
- {
- _inputBuffer.compact();
- _inputBuffer.flip();
- }
-
- ReferenceCountUtil.release(msg);
- }
-
- private void doParsing()
- {
- switch(_state)
- {
- case HEADER:
- if (_inputBuffer.remaining() >= 8)
- {
- byte[] header = new byte[8];
- _inputBuffer.get(header);
- _responseQueue.add(new HeaderResponse(header));
- _state = ParsingState.PERFORMATIVES;
- doParsing();
- }
- break;
- case PERFORMATIVES:
- _frameHandler.parse(_inputBuffer);
- break;
- default:
- throw new IllegalStateException("Unexpected state : " + _state);
- }
- }
-
- private void resetInputHandlerAfterSaslOutcome()
- {
- _state = ParsingState.HEADER;
- _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
- }
-
- private class MyConnectionHandler implements ConnectionHandler
- {
- private volatile int _frameSize = 512;
-
- @Override
- public void receiveOpen(final int channel, final Open close)
- {
- }
-
- @Override
- public void receiveClose(final int channel, final Close close)
- {
-
- }
-
- @Override
- public void receiveBegin(final int channel, final Begin begin)
- {
-
- }
-
- @Override
- public void receiveEnd(final int channel, final End end)
- {
-
- }
-
- @Override
- public void receiveAttach(final int channel, final Attach attach)
- {
-
- }
-
- @Override
- public void receiveDetach(final int channel, final Detach detach)
- {
-
- }
-
- @Override
- public void receiveTransfer(final int channel, final Transfer transfer)
- {
-
- }
-
- @Override
- public void receiveDisposition(final int channel, final Disposition disposition)
- {
-
- }
-
- @Override
- public void receiveFlow(final int channel, final Flow flow)
- {
-
- }
-
- @Override
- public int getMaxFrameSize()
- {
- return _frameSize;
- }
-
- @Override
- public int getChannelMax()
- {
- return UnsignedShort.MAX_VALUE.intValue();
- }
-
- @Override
- public void handleError(final Error parsingError)
- {
- LOGGER.error("Unexpected error {}", parsingError);
- }
-
- @Override
- public boolean closedForInput()
- {
- return false;
- }
-
- @Override
- public void receive(final List<ChannelFrameBody> channelFrameBodies)
- {
- for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
- {
- Response response;
- Object val = channelFrameBody.getFrameBody();
- int channel = channelFrameBody.getChannel();
- if (val instanceof FrameBody)
- {
- FrameBody frameBody = (FrameBody) val;
- if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
- {
- _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
- }
- response = new PerformativeResponse((short) channel, frameBody);
- }
- else if (val instanceof SaslFrameBody)
- {
- SaslFrameBody frameBody = (SaslFrameBody) val;
- response = new SaslPerformativeResponse((short) channel, frameBody);
-
- if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
- {
- resetInputHandlerAfterSaslOutcome();
- }
- }
- else
- {
- throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
- }
-
- _responseQueue.add(response);
- }
- }
-
- @Override
- public void receiveSaslInit(final SaslInit saslInit)
- {
-
- }
-
- @Override
- public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
- {
-
- }
-
- @Override
- public void receiveSaslChallenge(final SaslChallenge saslChallenge)
- {
-
- }
-
- @Override
- public void receiveSaslResponse(final SaslResponse saslResponse)
- {
-
- }
-
- @Override
- public void receiveSaslOutcome(final SaslOutcome saslOutcome)
- {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 52518ab..7d73ce8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,8 +20,6 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static com.google.common.util.concurrent.Futures.allAsList;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,21 +28,19 @@ import static org.hamcrest.Matchers.is;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -81,8 +77,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
-public class Interaction
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
{
private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Begin _begin;
@@ -94,14 +91,11 @@ public class Interaction
private final Flow _flow;
private final Transfer _transfer;
private final Disposition _disposition;
- private final FrameTransport _transport;
private final SaslInit _saslInit;
private final SaslResponse _saslResponse;
private byte[] _protocolHeader;
private UnsignedShort _connectionChannel;
private UnsignedShort _sessionChannel;
- private Response<?> _latestResponse;
- private ListenableFuture<?> _latestFuture;
private int _deliveryIdCounter;
private List<Transfer> _latestDelivery;
private Object _decodedLatestDelivery;
@@ -109,10 +103,10 @@ public class Interaction
Interaction(final FrameTransport frameTransport)
{
+ super(frameTransport);
final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
- _transport = frameTransport;
- _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+ _protocolHeader = frameTransport.getProtocolHeader();
_saslInit = new SaslInit();
_saslResponse = new SaslResponse();
@@ -154,6 +148,19 @@ public class Interaction
_disposition.setFirst(UnsignedInteger.ZERO);
}
+ public void doCloseConnection() throws Exception
+ {
+ Close close = new Close();
+
+ sendPerformative(close, UnsignedShort.valueOf((short) 0));
+ Response<?> response = getNextResponse();
+ if (!(response.getBody() instanceof Close))
+ {
+ throw new IllegalStateException(String.format(
+ "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
+ }
+ }
+
/////////////////////////
// Protocol Negotiation //
/////////////////////////
@@ -164,17 +171,15 @@ public class Interaction
return this;
}
- public Interaction negotiateProtocol() throws Exception
+ @Override
+ protected byte[] getProtocolHeader()
+ {
+ return _protocolHeader;
+ }
+
+ @Override
+ protected Interaction getInteraction()
{
- final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
return this;
}
@@ -977,83 +982,35 @@ public class Interaction
private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
+ SASLFrame transportFrame = new SASLFrame(frameBody);
+ sendPerformativeAndChainFuture(transportFrame, true);
}
private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
- }
-
- public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
- {
- sync();
- _latestResponse = _transport.getNextResponse();
- final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
- if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
- || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ final TransportFrame transportFrame;
+ try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
{
- return this;
- }
- acceptableResponseClasses.remove(null);
- if (_latestResponse != null)
- {
- for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ final QpidByteBuffer duplicate;
+ if (payload == null)
{
- if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- return this;
- }
+ duplicate = null;
+ }
+ else
+ {
+ duplicate = payload.duplicate();
+ }
+ transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+ ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+ if (frameBody instanceof Transfer)
+ {
+ listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+ }
+ if (duplicate != null)
+ {
+ listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
}
}
- throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
- acceptableResponseClasses,
- _latestResponse == null ? null : _latestResponse.getBody()));
- }
-
- public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
- {
- if (_latestFuture != null)
- {
- _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- _latestFuture = null;
- }
- return this;
- }
-
- public Response<?> getLatestResponse() throws Exception
- {
- sync();
- return _latestResponse;
- }
-
- public <T> T getLatestResponse(Class<T> type) throws Exception
- {
- sync();
- if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
- type.getSimpleName(),
- _latestResponse.getBody()));
- }
-
- return (T) _latestResponse.getBody();
}
public Interaction flowHandleFromLinkHandle()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
deleted file mode 100644
index 029dc70..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-
-public class Matchers
-{
- public static Matcher<Response> protocolHeader(byte[] expectedHeader)
- {
- return new BaseMatcher<Response>()
- {
- @Override
- public void describeTo(final Description description)
- {
- description.appendValue(new HeaderResponse(expectedHeader));
- }
-
- @Override
- public boolean matches(final Object o)
- {
- if (o == null)
- {
- return false;
- }
- if (!(o instanceof HeaderResponse))
- {
- return false;
- }
- if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
- {
- return false;
- }
- return true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
deleted file mode 100644
index 68f4322..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class OutputHandler extends ChannelOutboundHandlerAdapter
-{
- private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
-
-
- @Override
- public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
- {
-
- if (msg instanceof AMQFrame)
- {
- FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
- {
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- @Override
- public void send(final QpidByteBuffer msg)
- {
- byte[] data = new byte[msg.remaining()];
- msg.get(data);
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(data);
- try
- {
- OutputHandler.super.write(ctx, buffer, promise);
- }
- catch (Exception e)
- {
- promise.setFailure(e);
- }
- }
-
- @Override
- public void flush()
- {
- }
-
- @Override
- public void close()
- {
-
- }
- });
- _frameWriter.send(((AMQFrame) msg));
- }
- else
- {
- super.write(ctx, msg, promise);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
index 06a64dc..9e03a26 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
@@ -20,6 +20,7 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.tests.protocol.Response;
public class PerformativeResponse implements Response<FrameBody>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
deleted file mode 100644
index a7e341c..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-public interface Response<T>
-{
- T getBody();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
index 08893e0..02ab3c9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.tests.protocol.Response;
public class SaslPerformativeResponse implements Response<SaslFrameBody>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
deleted file mode 100644
index ea3d164..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface SpecificationTest
-{
- String section();
- String description();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 8a150fa..1ca4419 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -54,6 +54,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 817be05..a3270a0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
@@ -108,7 +108,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
interaction.consumeResponse().getLatestResponse(Flow.class);
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
index c75bb20..524d991 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
@@ -46,10 +46,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -125,7 +124,7 @@ public class ManagementTest extends BrokerAdminUsingTestBase
assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle())));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index a39b1b3..e85f356 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -39,10 +39,11 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class WebSocketTest extends BrokerAdminUsingTestBase
{
@@ -73,7 +74,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
{
- final Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ final Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.getLatestResponse(Open.class);
@@ -84,7 +86,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -97,7 +99,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
{
- final Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ final Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.getLatestResponse(Open.class);
@@ -108,7 +111,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 6ba9058..36203a4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -48,7 +48,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
public class DeleteOnCloseTest extends BrokerAdminUsingTestBase
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 97fe3bb..1cf1ff0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index bcd155f..0a45410 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -52,9 +52,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 1853446..cc377cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -99,7 +99,7 @@ public class OutcomeTest extends BrokerAdminUsingTestBase
assertThat(secondDeliveryPayload, is(equalTo("message2")));
// verify that no unexpected performative is received by closing
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index eb72532..245c624 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -76,8 +76,8 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -439,7 +439,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.disposition();
// verify that no unexpected performative is received by closing
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -688,8 +688,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
assertThat(isSettled.get(), is(true));
// verify no unexpected performative received by closing the connection
- transport.doCloseConnection();
-
+ interaction.doCloseConnection();
}
}
@@ -796,7 +795,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.transfer()
.sync();
- transport.doCloseConnection();
+ interaction.doCloseConnection();
assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 42f6114..bd7c113 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1496d13..fb61974 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -57,9 +57,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 619e5d9..db91db1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class ProtocolHeaderTest extends BrokerAdminUsingTestBase
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index b3f57c1..ab570da 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class OpenTest extends BrokerAdminUsingTestBase
@@ -77,7 +77,8 @@ public class OpenTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
{
- Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.openContainerId("testContainerId")
.open().consumeResponse()
@@ -88,7 +89,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index c559399..774cc00 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -43,7 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org