You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/06/10 17:27:55 UTC
qpid-broker-j git commit: QPID-7820: [Java Broker] [Protocol Tests]
Extend protocol test framework to test AMQP 1.0 websocket too
Repository: qpid-broker-j
Updated Branches:
refs/heads/master f03a3c460 -> b36c7180f
QPID-7820: [Java Broker] [Protocol Tests] Extend protocol test framework to test AMQP 1.0 websocket too
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/b36c7180
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/b36c7180
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/b36c7180
Branch: refs/heads/master
Commit: b36c7180f0d90a332f065e9a348e8dc0ae1972d4
Parents: f03a3c4
Author: Keith Wall <ke...@gmail.com>
Authored: Sat Jun 10 17:36:35 2017 +0100
Committer: Keith Wall <ke...@gmail.com>
Committed: Sat Jun 10 18:24:21 2017 +0100
----------------------------------------------------------------------
.../transport/websocket/WebSocketProvider.java | 5 +-
pom.xml | 5 +
systests/protocol-tests-amqp-1-0/pom.xml | 9 +
.../qpid/tests/protocol/v1_0/BrokerAdmin.java | 1 +
.../tests/protocol/v1_0/FrameTransport.java | 65 +++---
.../apache/qpid/tests/protocol/v1_0/Utils.java | 4 +-
.../websocket/WebSocketFrameTransport.java | 224 +++++++++++++++++++
.../main/resources/config-protocol-tests.json | 20 ++
.../bindmapjms/TemporaryDestinationTest.java | 4 +-
.../soleconn/CloseExistingPolicy.java | 14 +-
.../v1_0/extensions/soleconn/MixedPolicy.java | 10 +-
.../soleconn/RefuseConnectionPolicy.java | 18 +-
.../extensions/websocket/WebSocketTest.java | 114 ++++++++++
.../v1_0/messaging/DeleteOnCloseTest.java | 8 +-
.../protocol/v1_0/messaging/TransferTest.java | 12 +-
.../v1_0/transport/ProtocolHeaderTest.java | 19 +-
.../v1_0/transport/connection/OpenTest.java | 8 +-
.../v1_0/transport/link/AttachTest.java | 6 +-
.../protocol/v1_0/transport/link/FlowTest.java | 8 +-
.../v1_0/transport/security/sasl/SaslTest.java | 16 +-
.../v1_0/transport/session/BeginTest.java | 6 +-
21 files changed, 468 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
----------------------------------------------------------------------
diff --git a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
index f1b5e2d..7784dac 100644
--- a/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
+++ b/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
@@ -429,7 +429,10 @@ class WebSocketProvider implements AcceptingTransport
@Override
public void onWebSocketClose(final int statusCode, final String reason)
{
- _protocolEngine.closed();
+ if (_protocolEngine != null)
+ {
+ _protocolEngine.closed();
+ }
_activeConnections.remove(_connectionWrapper);
_idleTimeoutChecker.wakeup();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index da1fe62..45f6fa1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -371,6 +371,11 @@
<version>${netty-version}</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ <version>${netty-version}</version>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest-version}</version>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index ef33da3..9aa5295 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -43,6 +43,11 @@
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-websocket</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-codegen</artifactId>
<version>${project.version}</version>
</dependency>
@@ -111,6 +116,10 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
index e4efc76..a263b2d 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
@@ -48,6 +48,7 @@ public interface BrokerAdmin extends Pluggable
enum PortType
{
ANONYMOUS_AMQP,
+ ANONYMOUS_AMQPWS,
AMQP
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index eda903a..16eb3d8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -27,7 +27,6 @@ import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
@@ -38,6 +37,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
@@ -47,6 +47,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@@ -78,10 +79,12 @@ public class FrameTransport implements AutoCloseable
private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
- private final Channel _channel;
private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100);
private final EventLoopGroup _workerGroup;
+ private final InetSocketAddress _brokerAddress;
+ private final boolean _isSasl;
+ private Channel _channel;
private volatile boolean _channelClosedSeen = false;
private int _amqpConnectionId;
private short _amqpChannelId;
@@ -93,8 +96,18 @@ public class FrameTransport implements AutoCloseable
public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
{
+ _brokerAddress = brokerAddress;
+ _isSasl = isSasl;
_workerGroup = new NioEventLoopGroup();
+ }
+
+ public InetSocketAddress getBrokerAddress()
+ {
+ return _brokerAddress;
+ }
+ public FrameTransport connect()
+ {
try
{
Bootstrap b = new Bootstrap();
@@ -106,11 +119,12 @@ public class FrameTransport implements AutoCloseable
@Override
public void initChannel(SocketChannel ch) throws Exception
{
- ch.pipeline().addLast(new InputHandler(_queue, isSasl)).addLast(new OutputHandler());
+ ChannelPipeline pipeline = ch.pipeline();
+ buildInputOutputPipeline(pipeline);
}
});
- _channel = b.connect(brokerAddress).sync().channel();
+ _channel = b.connect(_brokerAddress).sync().channel();
_channel.closeFuture().addListener(future ->
{
_channelClosedSeen = true;
@@ -121,6 +135,12 @@ public class FrameTransport implements AutoCloseable
{
throw new RuntimeException(e);
}
+ return this;
+ }
+
+ protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+ {
+ pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
}
@Override
@@ -128,8 +148,12 @@ public class FrameTransport implements AutoCloseable
{
try
{
- _channel.disconnect().sync();
- _channel.close().sync();
+ if (_channel != null)
+ {
+ _channel.disconnect().sync();
+ _channel.close().sync();
+ _channel = null;
+ }
}
finally
{
@@ -140,6 +164,7 @@ public class FrameTransport implements AutoCloseable
public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
{
+ Preconditions.checkState(_channel != null, "Not connected");
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bytes);
ChannelFuture channelFuture = _channel.writeAndFlush(buffer);
@@ -149,6 +174,7 @@ public class FrameTransport implements AutoCloseable
public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
{
+ Preconditions.checkState(_channel != null, "Not connected");
final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null;
TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload);
ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
@@ -372,7 +398,6 @@ public class FrameTransport implements AutoCloseable
assertThat(_channelClosedSeen, is(true));
}
-
private int getConnectionId()
{
if (_amqpConnectionId == 0)
@@ -386,32 +411,6 @@ public class FrameTransport implements AutoCloseable
return _amqpConnectionId;
}
- public void assertChannelClosed()
- {
- try
- {
- ChannelFuture channelFuture = _channel.write(new byte[]{0});
- channelFuture.sync();
- throw new IllegalStateException(
- "Expecting the channel to be already closed by, but it was able to take more input.");
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- catch (Exception e)
- {
- if (e instanceof ClosedChannelException)
- {
- // PASS
- }
- else
- {
- throw new IllegalStateException("Unexpected exception", e);
- }
- }
- }
-
private static class ChannelClosedResponse implements Response
{
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index a9491e3..b892589 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -43,7 +43,7 @@ public class Utils
public static boolean doesNodeExist(final InetSocketAddress brokerAddress,
final String nodeAddress) throws Exception
{
- try (FrameTransport transport = new FrameTransport(brokerAddress))
+ try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
{
transport.doBeginSession();
@@ -86,7 +86,7 @@ public class Utils
public static Object receiveMessage(final InetSocketAddress brokerAddress,
final String queueName) throws Exception
{
- try (FrameTransport transport = new FrameTransport(brokerAddress))
+ try (FrameTransport transport = new FrameTransport(brokerAddress).connect())
{
transport.doAttachReceivingLink(queueName);
Flow flow = new Flow();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
new file mode 100644
index 0000000..b5ccd08
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketFrameTransport.java
@@ -0,0 +1,224 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.websocket;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+
+public class WebSocketFrameTransport extends FrameTransport
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketFrameTransport.class);
+
+ private WebSocketFramingOutputHandler _webSocketFramingOutputHandler;
+ private WebSocketDeframingInputHandler _webSocketDeframingInputHandler;
+ private WebSocketClientHandler _webSocketClientHandler;
+
+ public WebSocketFrameTransport(final InetSocketAddress addr)
+ {
+ super(addr);
+ }
+
+ @Override
+ protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+ {
+ URI uri = URI.create(String.format("tcp://%s:%d/",
+ getBrokerAddress().getHostString(),
+ getBrokerAddress().getPort()));
+ _webSocketClientHandler = new WebSocketClientHandler(
+ WebSocketClientHandshakerFactory.newHandshaker(
+ uri, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders()), uri);
+ _webSocketFramingOutputHandler = new WebSocketFramingOutputHandler();
+ _webSocketDeframingInputHandler = new WebSocketDeframingInputHandler();
+
+ pipeline.addLast(new HttpClientCodec());
+ pipeline.addLast(new HttpObjectAggregator(65536));
+ pipeline.addLast(_webSocketClientHandler);
+ pipeline.addLast(_webSocketFramingOutputHandler);
+ pipeline.addLast(_webSocketDeframingInputHandler);
+ super.buildInputOutputPipeline(pipeline);
+ }
+
+ @Override
+ public WebSocketFrameTransport connect()
+ {
+ super.connect();
+ _webSocketClientHandler.handshakeFuture().syncUninterruptibly();
+ return this;
+ }
+
+ WebSocketFrameTransport splitAmqpFrames()
+ {
+ _webSocketFramingOutputHandler.splitAmqpFrames();
+ return this;
+ }
+
+ private class WebSocketFramingOutputHandler extends ChannelOutboundHandlerAdapter
+ {
+ private boolean _splitFrames;
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception
+ {
+ if (msg instanceof ByteBuf)
+ {
+ final ByteBuf buf = ((ByteBuf) msg);
+ if (_splitFrames)
+ {
+ buf.forEachByte(b ->
+ {
+ ByteBuf byteBuf = Unpooled.copiedBuffer(new byte[] {b});
+ BinaryWebSocketFrame frame = new BinaryWebSocketFrame(byteBuf);
+ ctx.write(frame, promise);
+ return false;
+ });
+ }
+ else
+ {
+ BinaryWebSocketFrame frame = new BinaryWebSocketFrame((ByteBuf) msg);
+ ctx.write(frame, promise);
+ }
+ }
+ else
+ {
+ ctx.write(msg, promise);
+ }
+ }
+
+ void splitAmqpFrames()
+ {
+ _splitFrames = true;
+ }
+ }
+
+ private class WebSocketDeframingInputHandler extends ChannelInboundHandlerAdapter
+ {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ {
+ if (msg instanceof WebSocketFrame)
+ {
+ WebSocketFrame frame = (WebSocketFrame) msg;
+ ctx.fireChannelRead(frame.content());
+ }
+ else
+ {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx)
+ {
+ ctx.flush();
+ }
+ }
+
+ public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object>
+ {
+
+ private final WebSocketClientHandshaker _handshaker;
+ private ChannelPromise _handshakeFuture;
+
+ WebSocketClientHandler(final WebSocketClientHandshaker handshaker, final URI uri)
+ {
+ _handshaker = handshaker;
+ }
+
+ ChannelFuture handshakeFuture()
+ {
+ return _handshakeFuture;
+ }
+
+ @Override
+ public void handlerAdded(final ChannelHandlerContext ctx)
+ {
+ _handshakeFuture = ctx.newPromise();
+ }
+
+ @Override
+ public void channelActive(final ChannelHandlerContext ctx)
+ {
+ _handshaker.handshake(ctx.channel());
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object msg)
+ {
+ final Channel ch = ctx.channel();
+ if (!_handshaker.isHandshakeComplete())
+ {
+ // web socket client connected
+ _handshaker.finishHandshake(ch, (FullHttpResponse) msg);
+ _handshakeFuture.setSuccess();
+ return;
+ }
+
+ if (msg instanceof FullHttpResponse)
+ {
+ final FullHttpResponse response = (FullHttpResponse) msg;
+ throw new IllegalStateException(String.format("Unexpected FullHttpResponse (getStatus=%s, content=%s)",
+ response.content().toString(StandardCharsets.UTF_8), response.status()));
+ }
+
+ WebSocketFrame frame = (WebSocketFrame) msg;
+ ctx.fireChannelRead(frame.retain());
+ }
+
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause)
+ {
+ LOGGER.error("exceptionCaught", cause);
+
+ if (!_handshakeFuture.isDone())
+ {
+ _handshakeFuture.setFailure(cause);
+ }
+ ctx.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
index 1aaa210..764ff89 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
+++ b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
@@ -73,6 +73,26 @@
"type" : "nameAlias",
"durable" : true
} ]
+ }, {
+ "name" : "ANONYMOUS_AMQPWS",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "transports" : ["WS"],
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
} ],
"virtualhostnodes" : []
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 9f7c868..5583f2a 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -82,7 +82,7 @@ public class TemporaryDestinationTest extends ProtocolTestBase
{
String newTemporaryNodeAddress = null;
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doBeginSession();
@@ -124,7 +124,7 @@ public class TemporaryDestinationTest extends ProtocolTestBase
transport.doCloseConnection();
}
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doBeginSession();
assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index b250bd9..c3e5999 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -64,7 +64,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
@Test
public void basicNegotiation() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doProtocolNegotiation();
Open open = new Open();
@@ -92,7 +92,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
@Test
public void existingConnectionClosed() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -107,7 +107,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
@@ -145,7 +145,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
@Test
public void weakDetection() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -158,7 +158,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
@@ -195,7 +195,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
@Test
public void strongDetection() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -217,7 +217,7 @@ public class CloseExistingPolicy extends ProtocolTestBase
is(equalTo(SoleConnectionDetectionPolicy.STRONG.getValue())));
}
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
index 857368c..0248a9f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/MixedPolicy.java
@@ -56,7 +56,7 @@ public class MixedPolicy extends ProtocolTestBase
@Test
public void firstCloseThenRefuse() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -71,7 +71,7 @@ public class MixedPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
@@ -90,7 +90,7 @@ public class MixedPolicy extends ProtocolTestBase
assertThat(response2, is(notNullValue()));
assertThat(response2.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport3 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport3 = new FrameTransport(_brokerAddress).connect())
{
transport3.doProtocolNegotiation();
Open open3 = new Open();
@@ -115,7 +115,7 @@ public class MixedPolicy extends ProtocolTestBase
@Test
public void firstRefuseThenClose() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -130,7 +130,7 @@ public class MixedPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java
index d0f3f04..409cc17 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/RefuseConnectionPolicy.java
@@ -65,7 +65,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
@Test
public void basicNegotiation() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect();)
{
transport.doProtocolNegotiation();
Open open = new Open();
@@ -93,7 +93,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
@Test
public void newConnectionRefused() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -108,7 +108,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
@@ -142,7 +142,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
@Test
public void weakDetection() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -155,7 +155,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
@@ -188,7 +188,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
@Test
public void strongDetection() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -210,7 +210,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
is(equalTo(SoleConnectionDetectionPolicy.STRONG.getValue())));
}
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
@@ -241,7 +241,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
@Test
public void refuseIsDefault() throws Exception
{
- try (FrameTransport transport1 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport1 = new FrameTransport(_brokerAddress).connect())
{
transport1.doProtocolNegotiation();
Open open = new Open();
@@ -255,7 +255,7 @@ public class RefuseConnectionPolicy extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
- try (FrameTransport transport2 = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport2 = new FrameTransport(_brokerAddress).connect())
{
transport2.doProtocolNegotiation();
Open open2 = new Open();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
new file mode 100644
index 0000000..23026d8
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -0,0 +1,114 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.extensions.websocket;
+
+import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertArrayEquals;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class WebSocketTest extends ProtocolTestBase
+{
+ @Test
+ @SpecificationTest(section = "2.1", description = "Opening a WebSocket Connection")
+ public void protocolHeader() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
+ try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
+ {
+ byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+ transport.sendProtocolHeader(bytes);
+ HeaderResponse response = (HeaderResponse) transport.getNextResponse();
+ assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader());
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.4", description = "[...] a single AMQP frame MAY be split over one or more consecutive WebSocket messages. ")
+ @Ignore("QPID-7817")
+ public void amqpFramesSplitOverManyWebSocketFrames() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
+ try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
+ {
+ byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+ transport.sendProtocolHeader(bytes);
+ HeaderResponse response = (HeaderResponse) transport.getNextResponse();
+ assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader());
+
+ Open open = new Open();
+ open.setContainerId("testContainerId");
+ transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
+ Open responseOpen = transport.getNextPerformativeResponse(Open.class);
+
+ assertThat(responseOpen.getContainerId(), is(notNullValue()));
+ assertThat(responseOpen.getMaxFrameSize().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseOpen.getChannelMax().intValue(),
+ is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
+
+ transport.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.1", description = "Opening a WebSocket Connection")
+ public void successfulOpen() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
+ try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
+ {
+ transport.doProtocolNegotiation();
+
+ Open open = new Open();
+ open.setContainerId("testContainerId");
+ transport.sendPerformative(open, UnsignedShort.valueOf((short) 0));
+ Open responseOpen = transport.getNextPerformativeResponse(Open.class);
+
+ assertThat(responseOpen.getContainerId(), is(notNullValue()));
+ assertThat(responseOpen.getMaxFrameSize().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseOpen.getChannelMax().intValue(),
+ is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
+
+ transport.doCloseConnection();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 2a9016d..44b2b21 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -67,7 +67,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase
+ "creation ceases to exist.")
public void deleteOnCloseOnSource() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doBeginSession();
@@ -113,7 +113,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase
+ "creation ceases to exist.")
public void deleteOnCloseOnTarget() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doBeginSession();
@@ -163,7 +163,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase
+ "creation ceases to exist.")
public void doesNotDeleteOnDetach() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doBeginSession();
@@ -208,7 +208,7 @@ public class DeleteOnCloseTest extends ProtocolTestBase
assumeThat(getBrokerAdmin().supportsRestart(), is(true));
final String newTempQueueAddress;
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
transport.doBeginSession();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 92781d5..c099e20 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -79,7 +79,7 @@ public class TransferTest extends ProtocolTestBase
description = "Transfer without mandatory fields should result in a decoding error.")
public void emptyTransfer() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
@@ -103,7 +103,7 @@ public class TransferTest extends ProtocolTestBase
+ "[...] and can only be omitted for continuation transfers.")
public void transferWithoutDeliveryTag() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger linkHandle = UnsignedInteger.ONE;
transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
@@ -133,7 +133,7 @@ public class TransferTest extends ProtocolTestBase
public void transferUnsettled() throws Exception
{
String sentData = "foo";
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
@@ -165,7 +165,7 @@ public class TransferTest extends ProtocolTestBase
public void transferReceiverSettleModeFirst() throws Exception
{
String sentData = "foo";
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
Attach attach = new Attach();
@@ -210,7 +210,7 @@ public class TransferTest extends ProtocolTestBase
public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception
{
String sentData = "foo";
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
Attach attach = new Attach();
@@ -253,7 +253,7 @@ public class TransferTest extends ProtocolTestBase
@SpecificationTest(section = "", description = "Pipelined message send")
public void presettledPipelined() throws Exception
{
- try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
byte[] protocolHeader = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
Open open = new Open();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index ed9c10f..b68ae8d 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -33,21 +33,6 @@ import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
-
-/*
-
-TODO
-
-logging - log per test?
-protocol assertions
-admin factory
-performative test
-embedded broker per test admin impl that creates broker per test
-embedded broker per class admin impl creates/destroys vhost per test
-queue creation?
- */
-
-
public class ProtocolHeaderTest extends ProtocolTestBase
{
@Test
@@ -59,7 +44,7 @@ public class ProtocolHeaderTest extends ProtocolTestBase
public void successfulHeaderExchange() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
transport.sendProtocolHeader(bytes);
@@ -76,7 +61,7 @@ public class ProtocolHeaderTest extends ProtocolTestBase
public void unacceptableProtocolIdSent_SaslAcceptable() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
byte[] rawHeaderBytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
byte[] expectedSaslHeaderBytes = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index 29bbd35..d3c83b5 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -55,7 +55,7 @@ public class OpenTest extends ProtocolTestBase
public void emptyOpen() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doProtocolNegotiation();
Open open = new Open();
@@ -80,7 +80,7 @@ public class OpenTest extends ProtocolTestBase
public void successfulOpen() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doProtocolNegotiation();
Open open = new Open();
@@ -107,7 +107,7 @@ public class OpenTest extends ProtocolTestBase
public void failOpenOnChannelNotZero() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doProtocolNegotiation();
Open open = new Open();
@@ -131,7 +131,7 @@ public class OpenTest extends ProtocolTestBase
public void failOpenOnNonExistingHostname() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doProtocolNegotiation();
Open open = new Open();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index b37dff9..c5ce03e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -53,7 +53,7 @@ public class AttachTest extends ProtocolTestBase
public void emptyAttach() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doBeginSession();
Attach attach = new Attach();
@@ -76,7 +76,7 @@ public class AttachTest extends ProtocolTestBase
public void successfulAttachAsSender() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doBeginSession();
Attach attach = new Attach();
@@ -112,7 +112,7 @@ public class AttachTest extends ProtocolTestBase
String queueName = "testQueue";
getBrokerAdmin().createQueue(queueName);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
Role localRole = Role.RECEIVER;
transport.doBeginSession();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 9b4aa9c..1071f84 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -52,7 +52,7 @@ public class FlowTest extends ProtocolTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
Flow flow = new Flow();
@@ -74,7 +74,7 @@ public class FlowTest extends ProtocolTestBase
public void sessionEchoFlow() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doBeginSession();
Flow flow = new Flow();
@@ -102,7 +102,7 @@ public class FlowTest extends ProtocolTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
final UnsignedInteger handle = UnsignedInteger.ONE;
transport.doAttachSendingLink(handle, BrokerAdmin.TEST_QUEUE_NAME);
@@ -152,7 +152,7 @@ public class FlowTest extends ProtocolTestBase
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index ede03a2..a84b8e9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -66,7 +66,7 @@ public class SaslTest extends ProtocolTestBase
public void saslSuccessfulAuthentication() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -99,7 +99,7 @@ public class SaslTest extends ProtocolTestBase
public void saslSuccessfulAuthenticationWithChallengeResponse() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -138,7 +138,7 @@ public class SaslTest extends ProtocolTestBase
public void saslUnsuccessfulAuthentication() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -168,7 +168,7 @@ public class SaslTest extends ProtocolTestBase
public void unsupportedSaslMechanism() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -194,7 +194,7 @@ public class SaslTest extends ProtocolTestBase
public void authenticationBypassDisallowed() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -217,7 +217,7 @@ public class SaslTest extends ProtocolTestBase
public void clientSendsSaslMechanisms() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -238,7 +238,7 @@ public class SaslTest extends ProtocolTestBase
public void clientSendsSaslChallenge() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
@@ -259,7 +259,7 @@ public class SaslTest extends ProtocolTestBase
public void clientSendsSaslOutcome() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
- try (FrameTransport transport = new FrameTransport(addr, true))
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES);
HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/b36c7180/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index d12bb0b..31a48af 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -50,7 +50,7 @@ public class BeginTest extends ProtocolTestBase
public void emptyBegin() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try(FrameTransport transport = new FrameTransport(addr))
+ try(FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doOpenConnection();
Begin begin = new Begin();
@@ -72,7 +72,7 @@ public class BeginTest extends ProtocolTestBase
public void successfulBegin() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
transport.doOpenConnection();
Begin begin = new Begin();
@@ -103,7 +103,7 @@ public class BeginTest extends ProtocolTestBase
public void channelMax() throws Exception
{
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
+ try (FrameTransport transport = new FrameTransport(addr).connect())
{
UnsignedShort channelMax = UnsignedShort.valueOf((short) 5);
transport.doProtocolNegotiation();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org