You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/10 14:44:40 UTC
[1/6] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests]
Introduce new module 'protocol-tests-core' and move test common functionality
into it
Repository: qpid-broker-j
Updated Branches:
refs/heads/7.0.x 9a7628595 -> 7d8391ff7
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index ed72ba3..1a38cf0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 35ee4e7..ad56ea9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -69,9 +69,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -459,7 +459,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
}
}
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -588,7 +588,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
.disposition();
}
- transport.doCloseConnection();
+ interaction.doCloseConnection();
if (getBrokerAdmin().isQueueDepthSupported())
{
@@ -672,7 +672,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
.dispositionRole(Role.RECEIVER)
.disposition();
- transport.doCloseConnection();
+ interaction.doCloseConnection();
if (getBrokerAdmin().isQueueDepthSupported())
{
@@ -771,7 +771,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
}
while (!settled);
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index cf12b05..6fea928 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -49,7 +49,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class SaslTest extends BrokerAdminUsingTestBase
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 352fd19..24cb435 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -36,10 +36,11 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class BeginTest extends BrokerAdminUsingTestBase
{
@@ -74,7 +75,8 @@ public class BeginTest extends BrokerAdminUsingTestBase
try (FrameTransport transport = new FrameTransport(addr).connect())
{
final UnsignedShort channel = UnsignedShort.valueOf(37);
- Begin responseBegin = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ Begin responseBegin = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.sessionChannel(channel)
@@ -85,7 +87,7 @@ public class BeginTest extends BrokerAdminUsingTestBase
assertThat(responseBegin.getOutgoingWindow(), is(instanceOf(UnsignedInteger.class)));
assertThat(responseBegin.getNextOutgoingId(), is(instanceOf(UnsignedInteger.class)));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/pom.xml b/systests/protocol-tests-core/pom.xml
new file mode 100644
index 0000000..8c3bc87
--- /dev/null
+++ b/systests/protocol-tests-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ ~
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.0.1-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-core</artifactId>
+ <name>Apache Qpid Broker-J Protocol Tests Core</name>
+ <description>Core classes for Apache Qpid protocol tests</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
new file mode 100644
index 0000000..daf500d
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class FrameTransport implements AutoCloseable
+{
+ public static final long RESPONSE_TIMEOUT =
+ Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+ private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+ private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+ private final EventLoopGroup _workerGroup;
+ private final InetSocketAddress _brokerAddress;
+ private final InputHandler _inputHandler;
+ private final OutputHandler _outputHandler;
+
+ private volatile Channel _channel;
+ private volatile boolean _channelClosedSeen = false;
+
+ public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+ {
+ _brokerAddress = brokerAddress;
+ _inputHandler = new InputHandler(_queue, inputDecoder);
+ _outputHandler = new OutputHandler(outputEncoder);
+ _workerGroup = new NioEventLoopGroup();
+ }
+
+ public InetSocketAddress getBrokerAddress()
+ {
+ return _brokerAddress;
+ }
+
+ public FrameTransport connect()
+ {
+ try
+ {
+ Bootstrap b = new Bootstrap();
+ b.group(_workerGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.handler(new ChannelInitializer<SocketChannel>()
+ {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception
+ {
+ ChannelPipeline pipeline = ch.pipeline();
+ buildInputOutputPipeline(pipeline);
+ }
+ });
+
+ _channel = b.connect(_brokerAddress).sync().channel();
+ _channel.closeFuture().addListener(future ->
+ {
+ _channelClosedSeen = true;
+ _queue.add(CHANNEL_CLOSED_RESPONSE);
+ });
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+ {
+ pipeline.addLast(_inputHandler).addLast(_outputHandler);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.disconnect().sync();
+ _channel.close().sync();
+ _channel = null;
+ }
+ }
+ finally
+ {
+ _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+ }
+ }
+
+ public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(bytes);
+ _channel.write(buffer, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+
+ public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ if (!sync)
+ {
+ ChannelPromise promise = _channel.newPromise();
+ _channel.write(data, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+ else
+ {
+ ChannelFuture channelFuture = _channel.writeAndFlush(data);
+ channelFuture.sync();
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ public <T extends Response<?>> T getNextResponse() throws Exception
+ {
+ return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public void assertNoMoreResponses() throws Exception
+ {
+ Response response = getNextResponse();
+ assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+ }
+
+ public void assertNoMoreResponsesAndChannelClosed() throws Exception
+ {
+ assertNoMoreResponses();
+ assertThat(_channelClosedSeen, is(true));
+ }
+
+ private static class ChannelClosedResponse implements Response<Void>
+ {
+ @Override
+ public String toString()
+ {
+ return "ChannelClosed";
+ }
+
+ @Override
+ public Void getBody()
+ {
+ return null;
+ }
+ }
+
+ public abstract byte[] getProtocolHeader();
+
+ protected abstract Interaction newInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
new file mode 100644
index 0000000..9767b40
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+public class HeaderResponse implements Response<byte[]>
+{
+ private final byte[] _header;
+
+ public HeaderResponse(final byte[] header)
+ {
+ _header = header;
+ }
+
+ @Override
+ public byte[] getBody()
+ {
+ return _header;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HeaderResponse{" +
+ "_header=" + Arrays.toString(_header) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
new file mode 100644
index 0000000..369cfd1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public interface InputDecoder
+{
+ Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
new file mode 100644
index 0000000..2d5fb45
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+
+ private final BlockingQueue<Response<?>> _responseQueue;
+ private final InputDecoder _inputDecoder;
+
+ private ByteBuffer _inputBuffer = ByteBuffer.allocate(0);
+
+ InputHandler(final BlockingQueue<Response<?>> queue, InputDecoder inputDecoder)
+ {
+ _responseQueue = queue;
+ _inputDecoder = inputDecoder;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+ {
+ ByteBuf buf = (ByteBuf) msg;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(buf.readableBytes());
+ byteBuffer.put(buf.nioBuffer());
+ byteBuffer.flip();
+ LOGGER.debug("Incoming {} byte(s)", byteBuffer.remaining());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ ByteBuffer old = _inputBuffer;
+ _inputBuffer = ByteBuffer.allocate(_inputBuffer.remaining() + byteBuffer.remaining());
+ _inputBuffer.put(old);
+ _inputBuffer.put(byteBuffer);
+ _inputBuffer.flip();
+ }
+ else
+ {
+ _inputBuffer = byteBuffer;
+ }
+
+ _responseQueue.addAll(_inputDecoder.decode(_inputBuffer));
+
+ LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ _inputBuffer.compact();
+ _inputBuffer.flip();
+ }
+
+ ReferenceCountUtil.release(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
new file mode 100644
index 0000000..238c0a5
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Interaction<I extends Interaction>
+{
+ private final FrameTransport _transport;
+ private ListenableFuture<?> _latestFuture;
+ private Response<?> _latestResponse;
+
+ public Interaction(final FrameTransport frameTransport)
+ {
+ _transport = frameTransport;
+ }
+
+ public I consumeResponse(final Class<?>... responseTypes) throws Exception
+ {
+ sync();
+ _latestResponse = getNextResponse();
+ final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+ if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+ || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ {
+ return getInteraction();
+ }
+ acceptableResponseClasses.remove(null);
+ if (_latestResponse != null)
+ {
+ for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ {
+ if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ return getInteraction();
+ }
+ }
+ }
+ throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+ acceptableResponseClasses,
+ _latestResponse == null ? null : _latestResponse.getBody()));
+ }
+
+ protected Response<?> getNextResponse() throws Exception
+ {
+ return _transport.getNextResponse();
+ }
+
+ public I sync() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ if (_latestFuture != null)
+ {
+ _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ _latestFuture = null;
+ }
+ return getInteraction();
+ }
+
+ public Response<?> getLatestResponse() throws Exception
+ {
+ sync();
+ return _latestResponse;
+ }
+
+ public <T> T getLatestResponse(Class<T> type) throws Exception
+ {
+ sync();
+ if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getBody()));
+ }
+
+ return (T) _latestResponse.getBody();
+ }
+
+ protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return future;
+ }
+
+ public I negotiateProtocol() throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return getInteraction();
+ }
+
+ protected FrameTransport getTransport()
+ {
+ return _transport;
+ }
+
+ protected abstract byte[] getProtocolHeader();
+
+ protected abstract I getInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
new file mode 100644
index 0000000..292ae9a
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+ public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+ {
+ return new BaseMatcher<Response>()
+ {
+ @Override
+ public void describeTo(final Description description)
+ {
+ description.appendValue(new HeaderResponse(expectedHeader));
+ }
+
+ @Override
+ public boolean matches(final Object o)
+ {
+ if (o == null)
+ {
+ return false;
+ }
+ if (!(o instanceof HeaderResponse))
+ {
+ return false;
+ }
+ if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
+ {
+ return false;
+ }
+ return true;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
new file mode 100644
index 0000000..a6a4a47
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+public interface OutputEncoder
+{
+ ByteBuffer encode(Object msg);
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
new file mode 100644
index 0000000..40a2ca7
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+ private final OutputEncoder _outputEncoder;
+
+ OutputHandler(final OutputEncoder outputEncoder)
+ {
+ _outputEncoder = outputEncoder;
+ }
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+ {
+ ByteBuffer byteBuffer = _outputEncoder.encode(msg);
+ if (byteBuffer != null)
+ {
+ send(ctx, byteBuffer, promise);
+ }
+ else
+ {
+ super.write(ctx, msg, promise);
+ }
+ }
+
+ private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+ {
+ byte[] data = new byte[dataByteBuffer.remaining()];
+ dataByteBuffer.get(data);
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(data);
+ try
+ {
+ OutputHandler.super.write(ctx, buffer, promise);
+ }
+ catch (Exception e)
+ {
+ promise.setFailure(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
new file mode 100644
index 0000000..debc06f
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+public interface Response<T>
+{
+ T getBody();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
new file mode 100644
index 0000000..db6d7a1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface SpecificationTest
+{
+ String section();
+ String description();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/6] qpid-broker-j git commit: QPID-8042: [Broker-J][AMQP 1.0]
Process SASL frames first before parsing the remaining part of incoming byte
buffer
Posted by kw...@apache.org.
QPID-8042: [Broker-J][AMQP 1.0] Process SASL frames first before parsing the remaining part of incoming byte buffer
Cherry picked from 89e01ec
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/33cb908e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/33cb908e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/33cb908e
Branch: refs/heads/7.0.x
Commit: 33cb908e42bd09cbde3e663e686d95481fe6911e
Parents: d1a6ca2
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Dec 8 17:33:00 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:30 2018 +0000
----------------------------------------------------------------------
.../protocol/v1_0/framing/FrameHandler.java | 11 ++++-
.../qpid/tests/protocol/v1_0/FrameDecoder.java | 39 ++++++++-------
.../v1_0/transport/security/sasl/SaslTest.java | 50 +++++++++++++++-----
3 files changed, 70 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/33cb908e/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index f7e4029..23d08c3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -202,19 +202,26 @@ public class FrameHandler implements ProtocolHandler
return frameBody;
}
});
+
+ if (_isSasl)
+ {
+ break;
+ }
}
catch (AmqpErrorException ex)
{
frameParsingError = ex.getError();
}
}
- _connectionHandler.receive(channelFrameBodies);
if (frameParsingError != null)
{
_connectionHandler.handleError(frameParsingError);
_errored = true;
-
+ }
+ else
+ {
+ _connectionHandler.receive(channelFrameBodies);
}
}
catch (RuntimeException e)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/33cb908e/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 4dc06cb..0c94ad7 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -90,27 +90,34 @@ public class FrameDecoder implements InputDecoder
@Override
public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
{
- List<Response<?>> responses = new ArrayList<>();
+
QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
- switch(_state)
+ int remaining;
+
+ do
{
- case HEADER:
- if (inputBuffer.remaining() >= 8)
- {
- byte[] header = new byte[8];
- inputBuffer.get(header);
- responses.add(new HeaderResponse(header));
- _state = ParsingState.PERFORMATIVES;
+ remaining = qpidByteBuffer.remaining();
+ switch(_state)
+ {
+ case HEADER:
+ if (inputBuffer.remaining() >= 8)
+ {
+ byte[] header = new byte[8];
+ inputBuffer.get(header);
+ _connectionHandler._responseQueue.add(new HeaderResponse(header));
+ _state = ParsingState.PERFORMATIVES;
+ }
+ break;
+ case PERFORMATIVES:
_frameHandler.parse(qpidByteBuffer);
- }
- break;
- case PERFORMATIVES:
- _frameHandler.parse(qpidByteBuffer);
- break;
- default:
- throw new IllegalStateException("Unexpected state : " + _state);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state : " + _state);
+ }
}
+ while (qpidByteBuffer.remaining() != remaining);
+ List<Response<?>> responses = new ArrayList<>();
Response<?> r;
while((r = _connectionHandler._responseQueue.poll())!=null)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/33cb908e/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 1ca3f20..1b81507 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -36,18 +37,23 @@ import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameEncoder;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -106,7 +112,6 @@ public class SaslTest extends BrokerAdminUsingTestBase
}
}
- @Ignore("QPID-8042")
@Test
@SpecificationTest(section = "2.4.2",
description = "For applications that use many short-lived connections,"
@@ -118,17 +123,38 @@ public class SaslTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
- final Binary initialResponse = new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
final Interaction interaction = transport.newInteraction();
- interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
- .negotiateProtocol()
- .saslMechanism(PLAIN)
- .saslInitialResponse(initialResponse)
- .saslInit()
- .protocolHeader(AMQP_HEADER_BYTES)
- .negotiateProtocol()
- .openContainerId("testContainerId")
- .open();
+ FrameEncoder frameEncoder = new FrameEncoder();
+
+ SaslInit saslInit = new SaslInit();
+ saslInit.setMechanism(PLAIN);
+ saslInit.setInitialResponse(new Binary(String.format("\0%s\0%s", _username, _password)
+ .getBytes(StandardCharsets.US_ASCII)));
+ ByteBuffer saslInitByteBuffer = frameEncoder.encode(new SASLFrame(saslInit));
+
+ Open open = new Open();
+ open.setContainerId("containerId");
+ ByteBuffer openByteBuffer = frameEncoder.encode(new TransportFrame(0, open));
+
+ int initSize = saslInitByteBuffer.remaining();
+ int openSize = openByteBuffer.remaining();
+ int dataLength = SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize + openSize;
+ byte[] data = new byte[dataLength];
+
+ System.arraycopy(SASL_AMQP_HEADER_BYTES, 0, data, 0, SASL_AMQP_HEADER_BYTES.length);
+ saslInitByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length, initSize);
+ System.arraycopy(AMQP_HEADER_BYTES,
+ 0,
+ data,
+ SASL_AMQP_HEADER_BYTES.length + initSize,
+ AMQP_HEADER_BYTES.length);
+ openByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize, openSize);
+
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(data);
+
+ transport.sendPerformative(buffer);
+
final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/6] qpid-broker-j git commit: QPID-8042: [Broker-J][AMQP 1.0] Add
protocol test for pipelined connection open
Posted by kw...@apache.org.
QPID-8042: [Broker-J][AMQP 1.0] Add protocol test for pipelined connection open
Cherry picked from 9daed1e
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/88fd1245
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/88fd1245
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/88fd1245
Branch: refs/heads/7.0.x
Commit: 88fd1245c6b4ef4fc47314079f24329f5fd0646e
Parents: deab458
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 20 11:19:42 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:15 2018 +0000
----------------------------------------------------------------------
.../v1_0/transport/security/sasl/SaslTest.java | 47 +++++++++++++++++++-
.../apache/qpid/tests/protocol/Interaction.java | 8 ++++
2 files changed, 53 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88fd1245/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 6fea928..d7f3bc1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -37,6 +37,7 @@ import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -45,11 +46,12 @@ import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.SpecificationTest;
public class SaslTest extends BrokerAdminUsingTestBase
{
@@ -104,6 +106,47 @@ public class SaslTest extends BrokerAdminUsingTestBase
}
}
+ @Ignore("QPID-8042")
+ @Test
+ @SpecificationTest(section = "2.4.2",
+ description = "For applications that use many short-lived connections,"
+ + " it MAY be desirable to pipeline the connection negotiation process."
+ + " A peer MAY do this by starting to send subsequent frames before receiving"
+ + " the partner’s connection header or open frame")
+ public void saslSuccessfulAuthenticationWithPipelinedFrames() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
+ try (FrameTransport transport = new FrameTransport(addr, true).connect())
+ {
+ final Binary initialResponse = new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
+ final Interaction interaction = transport.newInteraction();
+ interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
+ .negotiateProtocol()
+ .saslMechanism(PLAIN)
+ .saslInitialResponse(initialResponse)
+ .saslInit()
+ .protocolHeader(AMQP_HEADER_BYTES)
+ .negotiateProtocol()
+ .openContainerId("testContainerId")
+ .open();
+
+ final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
+ assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
+
+ SaslMechanisms saslMechanismsResponse = interaction.consumeResponse().getLatestResponse(SaslMechanisms.class);
+ assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN));
+
+ SaslOutcome saslOutcome = interaction.consumeResponse().getLatestResponse(SaslOutcome.class);
+ assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK));
+
+ final byte[] headerResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
+ assertThat(headerResponse, is(equalTo(AMQP_HEADER_BYTES)));
+
+ interaction.consumeResponse().getLatestResponse(Open.class);
+ interaction.doCloseConnection();
+ }
+ }
+
@Test
@SpecificationTest(section = "5.3.2",
description = "SASL Negotiation [...] challenge/response step occurs once")
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88fd1245/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
index 238c0a5..2390227 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -92,6 +92,14 @@ public abstract class Interaction<I extends Interaction>
public <T> T getLatestResponse(Class<T> type) throws Exception
{
sync();
+
+ if (_latestResponse.getBody() == null)
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getClass()));
+ }
+
if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
{
throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/6] qpid-broker-j git commit: QPID-8042: [System Tests] Improve
pipe-lining of frames in protocol tests
Posted by kw...@apache.org.
QPID-8042: [System Tests] Improve pipe-lining of frames in protocol tests
Cherry picked from 2cbb629
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/d1a6ca21
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/d1a6ca21
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/d1a6ca21
Branch: refs/heads/7.0.x
Commit: d1a6ca216637907315de91575058e7b6d56e1011
Parents: 88fd124
Author: Alex Rudyy <or...@apache.org>
Authored: Mon Nov 20 16:55:39 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:30 2018 +0000
----------------------------------------------------------------------
.../qpid/tests/protocol/v1_0/Interaction.java | 6 +-
.../soleconn/CloseExistingPolicy.java | 6 +-
.../v1_0/transport/security/sasl/SaslTest.java | 2 +-
.../qpid/tests/protocol/FrameTransport.java | 26 +++------
.../apache/qpid/tests/protocol/Interaction.java | 8 +--
.../qpid/tests/protocol/OutputHandler.java | 59 +++++++++++++++++++-
6 files changed, 76 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 7d73ce8..4aad6ee 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -153,7 +153,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
Close close = new Close();
sendPerformative(close, UnsignedShort.valueOf((short) 0));
- Response<?> response = getNextResponse();
+ Response<?> response = consumeResponse().getLatestResponse();
if (!(response.getBody() instanceof Close))
{
throw new IllegalStateException(String.format(
@@ -983,7 +983,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
{
SASLFrame transportFrame = new SASLFrame(frameBody);
- sendPerformativeAndChainFuture(transportFrame, true);
+ sendPerformativeAndChainFuture(transportFrame);
}
private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
@@ -1001,7 +1001,7 @@ public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Inte
duplicate = payload.duplicate();
}
transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
- ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+ ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame);
if (frameBody instanceof Transfer)
{
listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
index 4bee689..5be5482 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/soleconn/CloseExistingPolicy.java
@@ -106,7 +106,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
.openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
.openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
CLOSE_EXISTING))
- .open();
+ .open()
+ .sync();
final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
@@ -145,7 +146,8 @@ public class CloseExistingPolicy extends BrokerAdminUsingTestBase
.openDesiredCapabilities(SOLE_CONNECTION_FOR_CONTAINER)
.openProperties(Collections.singletonMap(SOLE_CONNECTION_ENFORCEMENT_POLICY,
CLOSE_EXISTING))
- .open();
+ .open()
+ .sync();
final Close close1 = interaction1.consumeResponse().getLatestResponse(Close.class);
assertThat(close1.getError(), is(notNullValue()));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index d7f3bc1..1ca3f20 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -258,7 +258,7 @@ public class SaslTest extends BrokerAdminUsingTestBase
assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
interaction.consumeResponse(SaslMechanisms.class);
- interaction.open();
+ interaction.open().sync();
transport.assertNoMoreResponsesAndChannelClosed();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
index daf500d..28dc02e 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -31,14 +31,12 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
@@ -138,26 +136,15 @@ public abstract class FrameTransport implements AutoCloseable
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bytes);
_channel.write(buffer, promise);
- _channel.flush();
return JdkFutureAdapters.listenInPoolThread(promise);
}
- public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+ public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
{
Preconditions.checkState(_channel != null, "Not connected");
- if (!sync)
- {
- ChannelPromise promise = _channel.newPromise();
- _channel.write(data, promise);
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
- else
- {
- ChannelFuture channelFuture = _channel.writeAndFlush(data);
- channelFuture.sync();
- return Futures.immediateFuture(null);
- }
+ ChannelPromise promise = _channel.newPromise();
+ _channel.write(data, promise);
+ return JdkFutureAdapters.listenInPoolThread(promise);
}
public <T extends Response<?>> T getNextResponse() throws Exception
@@ -177,6 +164,11 @@ public abstract class FrameTransport implements AutoCloseable
assertThat(_channelClosedSeen, is(true));
}
+ public void flush()
+ {
+ _channel.flush();
+ }
+
private static class ChannelClosedResponse implements Response<Void>
{
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
index 2390227..b6e631d 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -75,6 +75,7 @@ public abstract class Interaction<I extends Interaction>
public I sync() throws InterruptedException, ExecutionException, TimeoutException
{
+ _transport.flush();
if (_latestFuture != null)
{
_latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -85,14 +86,11 @@ public abstract class Interaction<I extends Interaction>
public Response<?> getLatestResponse() throws Exception
{
- sync();
return _latestResponse;
}
public <T> T getLatestResponse(Class<T> type) throws Exception
{
- sync();
-
if (_latestResponse.getBody() == null)
{
throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
@@ -110,9 +108,9 @@ public abstract class Interaction<I extends Interaction>
return (T) _latestResponse.getBody();
}
- protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+ protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
if (_latestFuture != null)
{
_latestFuture = allAsList(_latestFuture, future);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/d1a6ca21/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
index 40a2ca7..5d40447 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -21,6 +21,8 @@
package org.apache.qpid.tests.protocol;
import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -31,10 +33,14 @@ import io.netty.channel.ChannelPromise;
public class OutputHandler extends ChannelOutboundHandlerAdapter
{
private final OutputEncoder _outputEncoder;
+ private Queue<ByteBufferPromisePair> _cachedEncodedFramePromisePairs;
+ private int _encodedSize;
OutputHandler(final OutputEncoder outputEncoder)
{
_outputEncoder = outputEncoder;
+ _cachedEncodedFramePromisePairs = new LinkedList<>();
+ _encodedSize = 0;
}
@Override
@@ -51,12 +57,44 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
}
}
- private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+ private synchronized void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
{
- byte[] data = new byte[dataByteBuffer.remaining()];
- dataByteBuffer.get(data);
+ _cachedEncodedFramePromisePairs.add(new ByteBufferPromisePair(dataByteBuffer, promise));
+ _encodedSize += dataByteBuffer.remaining();
+ }
+
+
+ @Override
+ public synchronized void flush(final ChannelHandlerContext ctx) throws Exception
+ {
+ final ChannelPromise promise = ctx.channel().newPromise();
+ byte[] data = new byte[_encodedSize];
+
+ int offset = 0;
+ while(offset < _encodedSize)
+ {
+ ByteBufferPromisePair currentPair = _cachedEncodedFramePromisePairs.poll();
+ int remaining = currentPair.byteBuffer.remaining();
+ currentPair.byteBuffer.get(data, offset, remaining) ;
+ offset += remaining;
+
+ promise.addListener(future -> {
+ if (future.isSuccess())
+ {
+ currentPair.channelPromise.setSuccess();
+ }
+ else
+ {
+ currentPair.channelPromise.setFailure(future.cause());
+ }
+ });
+ }
+
+ _encodedSize = 0;
+
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(data);
+
try
{
OutputHandler.super.write(ctx, buffer, promise);
@@ -65,5 +103,20 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
{
promise.setFailure(e);
}
+
+ super.flush(ctx);
+ }
+
+ class ByteBufferPromisePair
+ {
+ private ByteBuffer byteBuffer;
+ private ChannelPromise channelPromise;
+
+ ByteBufferPromisePair(final ByteBuffer byteBuffer, final ChannelPromise channelPromise)
+ {
+ this.byteBuffer = byteBuffer;
+ this.channelPromise = channelPromise;
+ }
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/6] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests]
Introduce new module 'protocol-tests-core' and move test common functionality
into it
Posted by kw...@apache.org.
QPID-8038: [Broker-J][System Tests] Introduce new module 'protocol-tests-core' and move test common functionality into it
Cherry picked from 06e53d7 with manual resolutions.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/deab4580
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/deab4580
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/deab4580
Branch: refs/heads/7.0.x
Commit: deab4580b7a876cfe16d35fe40804ac19e277cfc
Parents: 9a76285
Author: Alex Rudyy <or...@apache.org>
Authored: Sat Nov 18 01:31:29 2017 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:34:28 2018 +0000
----------------------------------------------------------------------
pom.xml | 13 +
systests/protocol-tests-amqp-1-0/pom.xml | 35 +--
.../qpid/tests/protocol/v1_0/FrameDecoder.java | 276 +++++++++++++++++
.../qpid/tests/protocol/v1_0/FrameEncoder.java | 93 ++++++
.../tests/protocol/v1_0/FrameTransport.java | 205 +------------
.../tests/protocol/v1_0/HeaderResponse.java | 46 ---
.../qpid/tests/protocol/v1_0/InputHandler.java | 305 -------------------
.../qpid/tests/protocol/v1_0/Interaction.java | 141 +++------
.../qpid/tests/protocol/v1_0/Matchers.java | 60 ----
.../qpid/tests/protocol/v1_0/OutputHandler.java | 96 ------
.../protocol/v1_0/PerformativeResponse.java | 1 +
.../qpid/tests/protocol/v1_0/Response.java | 25 --
.../protocol/v1_0/SaslPerformativeResponse.java | 1 +
.../tests/protocol/v1_0/SpecificationTest.java | 34 ---
.../tests/protocol/v1_0/DecodeErrorTest.java | 2 +
.../bindmapjms/TemporaryDestinationTest.java | 4 +-
.../extensions/management/ManagementTest.java | 5 +-
.../extensions/websocket/WebSocketTest.java | 13 +-
.../v1_0/messaging/DeleteOnCloseTest.java | 2 +-
.../protocol/v1_0/messaging/MessageFormat.java | 4 +-
.../v1_0/messaging/MultiTransferTest.java | 4 +-
.../protocol/v1_0/messaging/OutcomeTest.java | 4 +-
.../protocol/v1_0/messaging/TransferTest.java | 11 +-
.../v1_0/transaction/DischargeTest.java | 2 +-
.../transaction/TransactionalTransferTest.java | 4 +-
.../v1_0/transport/ProtocolHeaderTest.java | 2 +-
.../v1_0/transport/connection/OpenTest.java | 7 +-
.../v1_0/transport/link/AttachTest.java | 2 +-
.../protocol/v1_0/transport/link/FlowTest.java | 2 +-
.../transport/link/ResumeDeliveriesTest.java | 12 +-
.../v1_0/transport/security/sasl/SaslTest.java | 2 +-
.../v1_0/transport/session/BeginTest.java | 8 +-
systests/protocol-tests-core/pom.xml | 75 +++++
.../qpid/tests/protocol/FrameTransport.java | 198 ++++++++++++
.../qpid/tests/protocol/HeaderResponse.java | 46 +++
.../qpid/tests/protocol/InputDecoder.java | 30 ++
.../qpid/tests/protocol/InputHandler.java | 81 +++++
.../apache/qpid/tests/protocol/Interaction.java | 141 +++++++++
.../apache/qpid/tests/protocol/Matchers.java | 60 ++++
.../qpid/tests/protocol/OutputEncoder.java | 29 ++
.../qpid/tests/protocol/OutputHandler.java | 69 +++++
.../apache/qpid/tests/protocol/Response.java | 25 ++
.../qpid/tests/protocol/SpecificationTest.java | 34 +++
43 files changed, 1282 insertions(+), 927 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1262002..1287501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -196,6 +196,7 @@
<module>systests</module>
<module>systests/systests-utils</module>
<module>systests/qpid-systests-jms_2.0</module>
+ <module>systests/protocol-tests-core</module>
<module>systests/protocol-tests-amqp-1-0</module>
<module>systests/end-to-end-conversion-tests</module>
<module>perftests</module>
@@ -407,6 +408,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-amqp-1-0</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- External dependencies -->
<dependency>
<groupId>org.apache.qpid</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/pom.xml b/systests/protocol-tests-amqp-1-0/pom.xml
index 56fc1f7..c392cf7 100644
--- a/systests/protocol-tests-amqp-1-0/pom.xml
+++ b/systests/protocol-tests-amqp-1-0/pom.xml
@@ -56,6 +56,11 @@
<dependency>
<groupId>org.apache.qpid</groupId>
+ <artifactId>protocol-tests-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
<artifactId>qpid-systests-utils</artifactId>
</dependency>
@@ -94,38 +99,9 @@
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-bdbstore</artifactId>
<scope>test</scope>
- <optional>true</optional>
</dependency>
<dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-buffer</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-common</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-codec-http</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-handler</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-transport</artifactId>
- </dependency>
- <dependency>
- <groupId>org.hamcrest</groupId>
- <artifactId>hamcrest-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
</dependency>
@@ -133,6 +109,7 @@
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-integration</artifactId>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
new file mode 100644
index 0000000..4dc06cb
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.HeaderResponse;
+import org.apache.qpid.tests.protocol.InputDecoder;
+import org.apache.qpid.tests.protocol.Response;
+
+public class FrameDecoder implements InputDecoder
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(FrameDecoder.class);
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+ private final MyConnectionHandler _connectionHandler;
+ private volatile FrameHandler _frameHandler;
+
+ private enum ParsingState
+ {
+ HEADER,
+ PERFORMATIVES;
+ }
+
+ private final ValueHandler _valueHandler;
+
+ private volatile ParsingState _state = ParsingState.HEADER;
+
+ public FrameDecoder(final boolean isSasl)
+ {
+ _valueHandler = new ValueHandler(TYPE_REGISTRY);
+ _connectionHandler = new MyConnectionHandler();
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
+ }
+
+ @Override
+ public Collection<Response<?>> decode(final ByteBuffer inputBuffer)
+ {
+ List<Response<?>> responses = new ArrayList<>();
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(inputBuffer);
+ switch(_state)
+ {
+ case HEADER:
+ if (inputBuffer.remaining() >= 8)
+ {
+ byte[] header = new byte[8];
+ inputBuffer.get(header);
+ responses.add(new HeaderResponse(header));
+ _state = ParsingState.PERFORMATIVES;
+ _frameHandler.parse(qpidByteBuffer);
+ }
+ break;
+ case PERFORMATIVES:
+ _frameHandler.parse(qpidByteBuffer);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state : " + _state);
+ }
+
+ Response<?> r;
+ while((r = _connectionHandler._responseQueue.poll())!=null)
+ {
+ responses.add(r);
+ }
+ return responses;
+ }
+
+ private void resetInputHandlerAfterSaslOutcome()
+ {
+ _state = ParsingState.HEADER;
+ _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
+ }
+
+ private class MyConnectionHandler implements ConnectionHandler
+ {
+ private volatile int _frameSize = 512;
+ private Queue<Response<?>> _responseQueue = new ConcurrentLinkedQueue<>();
+
+ @Override
+ public void receiveOpen(final int channel, final Open close)
+ {
+ }
+
+ @Override
+ public void receiveClose(final int channel, final Close close)
+ {
+
+ }
+
+ @Override
+ public void receiveBegin(final int channel, final Begin begin)
+ {
+
+ }
+
+ @Override
+ public void receiveEnd(final int channel, final End end)
+ {
+
+ }
+
+ @Override
+ public void receiveAttach(final int channel, final Attach attach)
+ {
+
+ }
+
+ @Override
+ public void receiveDetach(final int channel, final Detach detach)
+ {
+
+ }
+
+ @Override
+ public void receiveTransfer(final int channel, final Transfer transfer)
+ {
+
+ }
+
+ @Override
+ public void receiveDisposition(final int channel, final Disposition disposition)
+ {
+
+ }
+
+ @Override
+ public void receiveFlow(final int channel, final Flow flow)
+ {
+
+ }
+
+ @Override
+ public int getMaxFrameSize()
+ {
+ return _frameSize;
+ }
+
+ @Override
+ public int getChannelMax()
+ {
+ return UnsignedShort.MAX_VALUE.intValue();
+ }
+
+ @Override
+ public void handleError(final Error parsingError)
+ {
+ LOGGER.error("Unexpected error {}", parsingError);
+ }
+
+ @Override
+ public boolean closedForInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void receive(final List<ChannelFrameBody> channelFrameBodies)
+ {
+ for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
+ {
+ Response response;
+ Object val = channelFrameBody.getFrameBody();
+ int channel = channelFrameBody.getChannel();
+ if (val instanceof FrameBody)
+ {
+ FrameBody frameBody = (FrameBody) val;
+ if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
+ {
+ _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
+ }
+ response = new PerformativeResponse((short) channel, frameBody);
+ }
+ else if (val instanceof SaslFrameBody)
+ {
+ SaslFrameBody frameBody = (SaslFrameBody) val;
+ response = new SaslPerformativeResponse((short) channel, frameBody);
+
+ if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
+ {
+ resetInputHandlerAfterSaslOutcome();
+ }
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
+ }
+
+ _responseQueue.add(response);
+ }
+ }
+
+ @Override
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
new file mode 100644
index 0000000..56d6e6f
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameEncoder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.OutputEncoder;
+
+public class FrameEncoder implements OutputEncoder
+{
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+
+ @Override
+ public ByteBuffer encode(final Object msg)
+ {
+ if (msg instanceof AMQFrame)
+ {
+ List<ByteBuffer> buffers = new ArrayList<>();
+ FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ byte[] data = new byte[msg.remaining()];
+ msg.get(data);
+ buffers.add(ByteBuffer.wrap(data));
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
+ _frameWriter.send(((AMQFrame) msg));
+
+ int remaining = 0;
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ remaining += byteBuffer.remaining();
+ }
+ ByteBuffer result = ByteBuffer.allocate(remaining);
+ for (ByteBuffer byteBuffer: buffers)
+ {
+ result.put(byteBuffer);
+ }
+ result.flip();
+ return result;
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 4d53751..dd59757 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -19,58 +19,12 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
+import static java.nio.charset.StandardCharsets.UTF_8;
import java.net.InetSocketAddress;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.JdkFutureAdapters;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class FrameTransport implements AutoCloseable
+public class FrameTransport extends org.apache.qpid.tests.protocol.FrameTransport
{
- public static final long RESPONSE_TIMEOUT = Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout",6000);
- private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
-
- private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
-
- private final EventLoopGroup _workerGroup;
- private final InetSocketAddress _brokerAddress;
- private final boolean _isSasl;
-
- private Channel _channel;
- private volatile boolean _channelClosedSeen = false;
-
public FrameTransport(final InetSocketAddress brokerAddress)
{
this(brokerAddress, false);
@@ -78,165 +32,20 @@ public class FrameTransport implements AutoCloseable
public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl)
{
- _brokerAddress = brokerAddress;
- _isSasl = isSasl;
- _workerGroup = new NioEventLoopGroup();
- }
-
- public InetSocketAddress getBrokerAddress()
- {
- return _brokerAddress;
+ super(brokerAddress, new FrameDecoder(isSasl), new FrameEncoder());
}
+ @Override
public FrameTransport connect()
{
- try
- {
- Bootstrap b = new Bootstrap();
- b.group(_workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>()
- {
- @Override
- public void initChannel(SocketChannel ch) throws Exception
- {
- ChannelPipeline pipeline = ch.pipeline();
- buildInputOutputPipeline(pipeline);
- }
- });
-
- _channel = b.connect(_brokerAddress).sync().channel();
- _channel.closeFuture().addListener(future ->
- {
- _channelClosedSeen = true;
- _queue.add(CHANNEL_CLOSED_RESPONSE);
- });
- }
- catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
+ super.connect();
return this;
}
- protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
- {
- pipeline.addLast(new InputHandler(_queue, _isSasl)).addLast(new OutputHandler());
- }
-
@Override
- public void close() throws Exception
- {
- try
- {
- if (_channel != null)
- {
- _channel.disconnect().sync();
- _channel.close().sync();
- _channel = null;
- }
- }
- finally
- {
- _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
- }
- }
-
- public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
- {
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(bytes);
- _channel.write(buffer, promise);
- _channel.flush();
- return JdkFutureAdapters.listenInPoolThread(promise);
- }
-
- public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception
+ public byte[] getProtocolHeader()
{
- Preconditions.checkState(_channel != null, "Not connected");
- ChannelPromise promise = _channel.newPromise();
- final TransportFrame transportFrame;
- try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
- {
- final QpidByteBuffer duplicate;
- if (payload == null)
- {
- duplicate = null;
- }
- else
- {
- duplicate = payload.duplicate();
- }
- transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
- _channel.write(transportFrame, promise);
- _channel.flush();
- final ListenableFuture<Void> listenableFuture = JdkFutureAdapters.listenInPoolThread(promise);
- if (frameBody instanceof Transfer)
- {
- listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
- }
- if (duplicate != null)
- {
- listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
- }
- return listenableFuture;
- }
- }
-
- public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception
- {
- SASLFrame transportFrame = new SASLFrame(frameBody);
- ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
- channelFuture.sync();
- return JdkFutureAdapters.listenInPoolThread(channelFuture);
- }
-
- public <T extends Response<?>> T getNextResponse() throws Exception
- {
- return (T)_queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- }
-
- public void doCloseConnection() throws Exception
- {
- Close close = new Close();
-
- sendPerformative(close, UnsignedShort.valueOf((short) 0));
- PerformativeResponse response = getNextResponse();
- if (!(response.getBody() instanceof Close))
- {
- throw new IllegalStateException(String.format(
- "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
- }
- }
-
- public void assertNoMoreResponses() throws Exception
- {
- Response response = getNextResponse();
- assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
- }
-
- public void assertNoMoreResponsesAndChannelClosed() throws Exception
- {
- assertNoMoreResponses();
- assertThat(_channelClosedSeen, is(true));
- }
-
- private static class ChannelClosedResponse implements Response<Void>
- {
- @Override
- public String toString()
- {
- return "ChannelClosed";
- }
-
- @Override
- public Void getBody()
- {
- return null;
- }
+ return "AMQP\0\1\0\0".getBytes(UTF_8);
}
public Interaction newInteraction()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
deleted file mode 100644
index 9503113..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-public class HeaderResponse implements Response<byte[]>
-{
- private final byte[] _header;
-
- public HeaderResponse(final byte[] header)
- {
- _header = header;
- }
-
- @Override
- public byte[] getBody()
- {
- return _header;
- }
-
- @Override
- public String toString()
- {
- return "HeaderResponse{" +
- "_header=" + Arrays.toString(_header) +
- '}';
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
deleted file mode 100644
index e3acd24..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.util.ReferenceCountUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
-import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.ChannelFrameBody;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-
-public class InputHandler extends ChannelInboundHandlerAdapter
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
- private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
-
- private enum ParsingState
- {
- HEADER,
- PERFORMATIVES
- }
-
- private final MyConnectionHandler _connectionHandler;
- private final ValueHandler _valueHandler;
- private final BlockingQueue<Response<?>> _responseQueue;
-
- private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
- private volatile FrameHandler _frameHandler;
- private volatile ParsingState _state = ParsingState.HEADER;
-
- public InputHandler(final BlockingQueue<Response<?>> queue, final boolean isSasl)
- {
-
- _valueHandler = new ValueHandler(TYPE_REGISTRY);
- _connectionHandler = new MyConnectionHandler();
- _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl);
-
- _responseQueue = queue;
- }
-
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
- {
- ByteBuf buf = (ByteBuf) msg;
- QpidByteBuffer qpidBuf = QpidByteBuffer.allocate(buf.readableBytes());
- qpidBuf.put(buf.nioBuffer());
- qpidBuf.flip();
- LOGGER.debug("Incoming {} byte(s)", qpidBuf.remaining());
-
- if (_inputBuffer.hasRemaining())
- {
- QpidByteBuffer old = _inputBuffer;
- _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
- _inputBuffer.put(old);
- _inputBuffer.put(qpidBuf);
- old.dispose();
- qpidBuf.dispose();
- _inputBuffer.flip();
- }
- else
- {
- _inputBuffer.dispose();
- _inputBuffer = qpidBuf;
- }
-
- doParsing();
-
- LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
-
- if (_inputBuffer.hasRemaining())
- {
- _inputBuffer.compact();
- _inputBuffer.flip();
- }
-
- ReferenceCountUtil.release(msg);
- }
-
- private void doParsing()
- {
- switch(_state)
- {
- case HEADER:
- if (_inputBuffer.remaining() >= 8)
- {
- byte[] header = new byte[8];
- _inputBuffer.get(header);
- _responseQueue.add(new HeaderResponse(header));
- _state = ParsingState.PERFORMATIVES;
- doParsing();
- }
- break;
- case PERFORMATIVES:
- _frameHandler.parse(_inputBuffer);
- break;
- default:
- throw new IllegalStateException("Unexpected state : " + _state);
- }
- }
-
- private void resetInputHandlerAfterSaslOutcome()
- {
- _state = ParsingState.HEADER;
- _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false);
- }
-
- private class MyConnectionHandler implements ConnectionHandler
- {
- private volatile int _frameSize = 512;
-
- @Override
- public void receiveOpen(final int channel, final Open close)
- {
- }
-
- @Override
- public void receiveClose(final int channel, final Close close)
- {
-
- }
-
- @Override
- public void receiveBegin(final int channel, final Begin begin)
- {
-
- }
-
- @Override
- public void receiveEnd(final int channel, final End end)
- {
-
- }
-
- @Override
- public void receiveAttach(final int channel, final Attach attach)
- {
-
- }
-
- @Override
- public void receiveDetach(final int channel, final Detach detach)
- {
-
- }
-
- @Override
- public void receiveTransfer(final int channel, final Transfer transfer)
- {
-
- }
-
- @Override
- public void receiveDisposition(final int channel, final Disposition disposition)
- {
-
- }
-
- @Override
- public void receiveFlow(final int channel, final Flow flow)
- {
-
- }
-
- @Override
- public int getMaxFrameSize()
- {
- return _frameSize;
- }
-
- @Override
- public int getChannelMax()
- {
- return UnsignedShort.MAX_VALUE.intValue();
- }
-
- @Override
- public void handleError(final Error parsingError)
- {
- LOGGER.error("Unexpected error {}", parsingError);
- }
-
- @Override
- public boolean closedForInput()
- {
- return false;
- }
-
- @Override
- public void receive(final List<ChannelFrameBody> channelFrameBodies)
- {
- for (final ChannelFrameBody channelFrameBody : channelFrameBodies)
- {
- Response response;
- Object val = channelFrameBody.getFrameBody();
- int channel = channelFrameBody.getChannel();
- if (val instanceof FrameBody)
- {
- FrameBody frameBody = (FrameBody) val;
- if (frameBody instanceof Open && ((Open) frameBody).getMaxFrameSize() != null)
- {
- _frameSize = ((Open) frameBody).getMaxFrameSize().intValue();
- }
- response = new PerformativeResponse((short) channel, frameBody);
- }
- else if (val instanceof SaslFrameBody)
- {
- SaslFrameBody frameBody = (SaslFrameBody) val;
- response = new SaslPerformativeResponse((short) channel, frameBody);
-
- if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK))
- {
- resetInputHandlerAfterSaslOutcome();
- }
- }
- else
- {
- throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
- }
-
- _responseQueue.add(response);
- }
- }
-
- @Override
- public void receiveSaslInit(final SaslInit saslInit)
- {
-
- }
-
- @Override
- public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
- {
-
- }
-
- @Override
- public void receiveSaslChallenge(final SaslChallenge saslChallenge)
- {
-
- }
-
- @Override
- public void receiveSaslResponse(final SaslResponse saslResponse)
- {
-
- }
-
- @Override
- public void receiveSaslOutcome(final SaslOutcome saslOutcome)
- {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 52518ab..7d73ce8 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -20,8 +20,6 @@
package org.apache.qpid.tests.protocol.v1_0;
-import static com.google.common.util.concurrent.Futures.allAsList;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,21 +28,19 @@ import static org.hamcrest.Matchers.is;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -81,8 +77,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.Response;
-public class Interaction
+public class Interaction extends org.apache.qpid.tests.protocol.Interaction<Interaction>
{
private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Begin _begin;
@@ -94,14 +91,11 @@ public class Interaction
private final Flow _flow;
private final Transfer _transfer;
private final Disposition _disposition;
- private final FrameTransport _transport;
private final SaslInit _saslInit;
private final SaslResponse _saslResponse;
private byte[] _protocolHeader;
private UnsignedShort _connectionChannel;
private UnsignedShort _sessionChannel;
- private Response<?> _latestResponse;
- private ListenableFuture<?> _latestFuture;
private int _deliveryIdCounter;
private List<Transfer> _latestDelivery;
private Object _decodedLatestDelivery;
@@ -109,10 +103,10 @@ public class Interaction
Interaction(final FrameTransport frameTransport)
{
+ super(frameTransport);
final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO;
- _transport = frameTransport;
- _protocolHeader = "AMQP\0\1\0\0".getBytes(UTF_8);
+ _protocolHeader = frameTransport.getProtocolHeader();
_saslInit = new SaslInit();
_saslResponse = new SaslResponse();
@@ -154,6 +148,19 @@ public class Interaction
_disposition.setFirst(UnsignedInteger.ZERO);
}
+ public void doCloseConnection() throws Exception
+ {
+ Close close = new Close();
+
+ sendPerformative(close, UnsignedShort.valueOf((short) 0));
+ Response<?> response = getNextResponse();
+ if (!(response.getBody() instanceof Close))
+ {
+ throw new IllegalStateException(String.format(
+ "Unexpected response to connection Close. Expected Close got '%s'", response.getBody()));
+ }
+ }
+
/////////////////////////
// Protocol Negotiation //
/////////////////////////
@@ -164,17 +171,15 @@ public class Interaction
return this;
}
- public Interaction negotiateProtocol() throws Exception
+ @Override
+ protected byte[] getProtocolHeader()
+ {
+ return _protocolHeader;
+ }
+
+ @Override
+ protected Interaction getInteraction()
{
- final ListenableFuture<Void> future = _transport.sendProtocolHeader(_protocolHeader);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
return this;
}
@@ -977,83 +982,35 @@ public class Interaction
private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
+ SASLFrame transportFrame = new SASLFrame(frameBody);
+ sendPerformativeAndChainFuture(transportFrame, true);
}
private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception
{
- final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, channel);
- if (_latestFuture != null)
- {
- _latestFuture = allAsList(_latestFuture, future);
- }
- else
- {
- _latestFuture = future;
- }
- }
-
- public Interaction consumeResponse(final Class<?>... responseTypes) throws Exception
- {
- sync();
- _latestResponse = _transport.getNextResponse();
- final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
- if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
- || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ final TransportFrame transportFrame;
+ try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null)
{
- return this;
- }
- acceptableResponseClasses.remove(null);
- if (_latestResponse != null)
- {
- for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ final QpidByteBuffer duplicate;
+ if (payload == null)
{
- if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- return this;
- }
+ duplicate = null;
+ }
+ else
+ {
+ duplicate = payload.duplicate();
+ }
+ transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate);
+ ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame, false);
+ if (frameBody instanceof Transfer)
+ {
+ listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor());
+ }
+ if (duplicate != null)
+ {
+ listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor());
}
}
- throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
- acceptableResponseClasses,
- _latestResponse == null ? null : _latestResponse.getBody()));
- }
-
- public Interaction sync() throws InterruptedException, ExecutionException, TimeoutException
- {
- if (_latestFuture != null)
- {
- _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
- _latestFuture = null;
- }
- return this;
- }
-
- public Response<?> getLatestResponse() throws Exception
- {
- sync();
- return _latestResponse;
- }
-
- public <T> T getLatestResponse(Class<T> type) throws Exception
- {
- sync();
- if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
- {
- throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
- type.getSimpleName(),
- _latestResponse.getBody()));
- }
-
- return (T) _latestResponse.getBody();
}
public Interaction flowHandleFromLinkHandle()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
deleted file mode 100644
index 029dc70..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Matchers.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.util.Arrays;
-
-import org.hamcrest.BaseMatcher;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-
-public class Matchers
-{
- public static Matcher<Response> protocolHeader(byte[] expectedHeader)
- {
- return new BaseMatcher<Response>()
- {
- @Override
- public void describeTo(final Description description)
- {
- description.appendValue(new HeaderResponse(expectedHeader));
- }
-
- @Override
- public boolean matches(final Object o)
- {
- if (o == null)
- {
- return false;
- }
- if (!(o instanceof HeaderResponse))
- {
- return false;
- }
- if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
- {
- return false;
- }
- return true;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
deleted file mode 100644
index 68f4322..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
-import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
-import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class OutputHandler extends ChannelOutboundHandlerAdapter
-{
- private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
- .registerTransportLayer()
- .registerMessagingLayer()
- .registerTransactionLayer()
- .registerSecurityLayer()
- .registerExtensionSoleconnLayer();
-
-
- @Override
- public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
- {
-
- if (msg instanceof AMQFrame)
- {
- FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
- {
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- @Override
- public void send(final QpidByteBuffer msg)
- {
- byte[] data = new byte[msg.remaining()];
- msg.get(data);
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(data);
- try
- {
- OutputHandler.super.write(ctx, buffer, promise);
- }
- catch (Exception e)
- {
- promise.setFailure(e);
- }
- }
-
- @Override
- public void flush()
- {
- }
-
- @Override
- public void close()
- {
-
- }
- });
- _frameWriter.send(((AMQFrame) msg));
- }
- else
- {
- super.write(ctx, msg, promise);
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
index 06a64dc..9e03a26 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
@@ -20,6 +20,7 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.tests.protocol.Response;
public class PerformativeResponse implements Response<FrameBody>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
deleted file mode 100644
index a7e341c..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-public interface Response<T>
-{
- T getBody();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
index 08893e0..02ab3c9 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java
@@ -21,6 +21,7 @@
package org.apache.qpid.tests.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.tests.protocol.Response;
public class SaslPerformativeResponse implements Response<SaslFrameBody>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
deleted file mode 100644
index ea3d164..0000000
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SpecificationTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.tests.protocol.v1_0;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.METHOD)
-public @interface SpecificationTest
-{
- String section();
- String description();
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index 8a150fa..1ca4419 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -54,6 +54,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 817be05..a3270a0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
@@ -108,7 +108,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
interaction.consumeResponse().getLatestResponse(Flow.class);
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
index c75bb20..524d991 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/management/ManagementTest.java
@@ -46,10 +46,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -125,7 +124,7 @@ public class ManagementTest extends BrokerAdminUsingTestBase
assertThat(flow.getLinkCredit(), is(equalTo(UnsignedInteger.ZERO)));
assertThat(flow.getHandle(), is(equalTo(receiverResponse.getHandle())));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
index a39b1b3..e85f356 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/websocket/WebSocketTest.java
@@ -39,10 +39,11 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class WebSocketTest extends BrokerAdminUsingTestBase
{
@@ -73,7 +74,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
try (FrameTransport transport = new WebSocketFrameTransport(addr).splitAmqpFrames().connect())
{
- final Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ final Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.getLatestResponse(Open.class);
@@ -84,7 +86,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -97,7 +99,8 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQPWS);
try (FrameTransport transport = new WebSocketFrameTransport(addr).connect())
{
- final Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ final Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse()
.getLatestResponse(Open.class);
@@ -108,7 +111,7 @@ public class WebSocketTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
index 6ba9058..36203a4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/DeleteOnCloseTest.java
@@ -48,7 +48,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
public class DeleteOnCloseTest extends BrokerAdminUsingTestBase
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 97fe3bb..1cf1ff0 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -43,9 +43,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index bcd155f..0a45410 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -52,9 +52,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
index 1853446..cc377cc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/OutcomeTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -99,7 +99,7 @@ public class OutcomeTest extends BrokerAdminUsingTestBase
assertThat(secondDeliveryPayload, is(equalTo("message2")));
// verify that no unexpected performative is received by closing
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index eb72532..245c624 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -76,8 +76,8 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -439,7 +439,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.disposition();
// verify that no unexpected performative is received by closing
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -688,8 +688,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
assertThat(isSettled.get(), is(true));
// verify no unexpected performative received by closing the connection
- transport.doCloseConnection();
-
+ interaction.doCloseConnection();
}
}
@@ -796,7 +795,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.transfer()
.sync();
- transport.doCloseConnection();
+ interaction.doCloseConnection();
assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(2)));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 42f6114..bd7c113 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -58,7 +58,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 1496d13..fb61974 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -57,9 +57,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
index 619e5d9..db91db1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
@@ -30,7 +30,7 @@ import org.junit.Test;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class ProtocolHeaderTest extends BrokerAdminUsingTestBase
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index b3f57c1..ab570da 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -42,7 +42,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class OpenTest extends BrokerAdminUsingTestBase
@@ -77,7 +77,8 @@ public class OpenTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
{
- Open responseOpen = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ Open responseOpen = interaction
.negotiateProtocol().consumeResponse()
.openContainerId("testContainerId")
.open().consumeResponse()
@@ -88,7 +89,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
assertThat(responseOpen.getChannelMax().intValue(),
is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/deab4580/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index c559399..774cc00 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -43,7 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[6/6] qpid-broker-j git commit: QPID-8042: Fix defect that prevents
pipelining with header within protocol tests
Posted by kw...@apache.org.
QPID-8042: Fix defect that prevents pipelining with header within protocol tests
Revert saslSuccessfulAuthenticationWithPipelinedFrames to a readable style.
Cherry picked from 1c382be
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/7d8391ff
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/7d8391ff
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/7d8391ff
Branch: refs/heads/7.0.x
Commit: 7d8391ff7f5946e46c5c3840c628d80debe9e433
Parents: 33cb908
Author: Keith Wall <kw...@apache.org>
Authored: Wed Jan 10 11:02:49 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Wed Jan 10 14:40:30 2018 +0000
----------------------------------------------------------------------
.../v1_0/transport/security/sasl/SaslTest.java | 49 +++++---------------
.../qpid/tests/protocol/OutputHandler.java | 9 ++++
2 files changed, 20 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7d8391ff/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index 1b81507..ee657f3 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -37,23 +36,17 @@ import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.xml.bind.DatatypeConverter;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import org.junit.Before;
import org.junit.Test;
-import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
-import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
-import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.protocol.SpecificationTest;
-import org.apache.qpid.tests.protocol.v1_0.FrameEncoder;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -123,38 +116,18 @@ public class SaslTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP);
try (FrameTransport transport = new FrameTransport(addr, true).connect())
{
+ final Binary initialResponse =
+ new Binary(String.format("\0%s\0%s", _username, _password).getBytes(StandardCharsets.US_ASCII));
final Interaction interaction = transport.newInteraction();
- FrameEncoder frameEncoder = new FrameEncoder();
-
- SaslInit saslInit = new SaslInit();
- saslInit.setMechanism(PLAIN);
- saslInit.setInitialResponse(new Binary(String.format("\0%s\0%s", _username, _password)
- .getBytes(StandardCharsets.US_ASCII)));
- ByteBuffer saslInitByteBuffer = frameEncoder.encode(new SASLFrame(saslInit));
-
- Open open = new Open();
- open.setContainerId("containerId");
- ByteBuffer openByteBuffer = frameEncoder.encode(new TransportFrame(0, open));
-
- int initSize = saslInitByteBuffer.remaining();
- int openSize = openByteBuffer.remaining();
- int dataLength = SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize + openSize;
- byte[] data = new byte[dataLength];
-
- System.arraycopy(SASL_AMQP_HEADER_BYTES, 0, data, 0, SASL_AMQP_HEADER_BYTES.length);
- saslInitByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length, initSize);
- System.arraycopy(AMQP_HEADER_BYTES,
- 0,
- data,
- SASL_AMQP_HEADER_BYTES.length + initSize,
- AMQP_HEADER_BYTES.length);
- openByteBuffer.get(data, SASL_AMQP_HEADER_BYTES.length + AMQP_HEADER_BYTES.length + initSize, openSize);
-
- ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
- buffer.writeBytes(data);
-
- transport.sendPerformative(buffer);
-
+ interaction.protocolHeader(SASL_AMQP_HEADER_BYTES)
+ .negotiateProtocol()
+ .saslMechanism(PLAIN)
+ .saslInitialResponse(initialResponse)
+ .saslInit()
+ .protocolHeader(AMQP_HEADER_BYTES)
+ .negotiateProtocol()
+ .openContainerId("testContainerId")
+ .open();
final byte[] saslHeaderResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
assertThat(saslHeaderResponse, is(equalTo(SASL_AMQP_HEADER_BYTES)));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/7d8391ff/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
index 5d40447..fae1ce4 100644
--- a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -51,6 +51,15 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter
{
send(ctx, byteBuffer, promise);
}
+ else if (msg instanceof ByteBuf)
+ {
+ ByteBuf buf = (ByteBuf) msg;
+ final ByteBuffer bytes = ByteBuffer.allocate(buf.readableBytes());
+ buf.readBytes(bytes.array());
+ buf.release();
+
+ send(ctx, bytes, promise);
+ }
else
{
super.write(ctx, msg, promise);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org