You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/18 01:42:20 UTC
[2/3] qpid-broker-j git commit: QPID-8038: [Broker-J][System Tests]
Introduce new module 'protocol-tests-core' and move test common functionality
into it
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
index 35ee4e7..ad56ea9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -69,9 +69,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.Response;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -459,7 +459,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
}
}
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
@@ -588,7 +588,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
.disposition();
}
- transport.doCloseConnection();
+ interaction.doCloseConnection();
if (getBrokerAdmin().isQueueDepthSupported())
{
@@ -672,7 +672,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
.dispositionRole(Role.RECEIVER)
.disposition();
- transport.doCloseConnection();
+ interaction.doCloseConnection();
if (getBrokerAdmin().isQueueDepthSupported())
{
@@ -771,7 +771,7 @@ public class ResumeDeliveriesTest extends BrokerAdminUsingTestBase
}
while (!settled);
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
index cf12b05..6fea928 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java
@@ -49,7 +49,7 @@ import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class SaslTest extends BrokerAdminUsingTestBase
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index 352fd19..24cb435 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -36,10 +36,11 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.SpecificationTest;
public class BeginTest extends BrokerAdminUsingTestBase
{
@@ -74,7 +75,8 @@ public class BeginTest extends BrokerAdminUsingTestBase
try (FrameTransport transport = new FrameTransport(addr).connect())
{
final UnsignedShort channel = UnsignedShort.valueOf(37);
- Begin responseBegin = transport.newInteraction()
+ Interaction interaction = transport.newInteraction();
+ Begin responseBegin = interaction
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.sessionChannel(channel)
@@ -85,7 +87,7 @@ public class BeginTest extends BrokerAdminUsingTestBase
assertThat(responseBegin.getOutgoingWindow(), is(instanceOf(UnsignedInteger.class)));
assertThat(responseBegin.getNextOutgoingId(), is(instanceOf(UnsignedInteger.class)));
- transport.doCloseConnection();
+ interaction.doCloseConnection();
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/pom.xml
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/pom.xml b/systests/protocol-tests-core/pom.xml
new file mode 100644
index 0000000..fa561ed
--- /dev/null
+++ b/systests/protocol-tests-core/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ ~
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.1.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-core</artifactId>
+ <name>Apache Qpid Broker-J Protocol Tests Core</name>
+ <description>Core classes for Apache Qpid protocol tests</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
new file mode 100644
index 0000000..daf500d
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/FrameTransport.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public abstract class FrameTransport implements AutoCloseable
+{
+ public static final long RESPONSE_TIMEOUT =
+ Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
+ private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
+
+ private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
+ private final EventLoopGroup _workerGroup;
+ private final InetSocketAddress _brokerAddress;
+ private final InputHandler _inputHandler;
+ private final OutputHandler _outputHandler;
+
+ private volatile Channel _channel;
+ private volatile boolean _channelClosedSeen = false;
+
+ public FrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
+ {
+ _brokerAddress = brokerAddress;
+ _inputHandler = new InputHandler(_queue, inputDecoder);
+ _outputHandler = new OutputHandler(outputEncoder);
+ _workerGroup = new NioEventLoopGroup();
+ }
+
+ public InetSocketAddress getBrokerAddress()
+ {
+ return _brokerAddress;
+ }
+
+ public FrameTransport connect()
+ {
+ try
+ {
+ Bootstrap b = new Bootstrap();
+ b.group(_workerGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.handler(new ChannelInitializer<SocketChannel>()
+ {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception
+ {
+ ChannelPipeline pipeline = ch.pipeline();
+ buildInputOutputPipeline(pipeline);
+ }
+ });
+
+ _channel = b.connect(_brokerAddress).sync().channel();
+ _channel.closeFuture().addListener(future ->
+ {
+ _channelClosedSeen = true;
+ _queue.add(CHANNEL_CLOSED_RESPONSE);
+ });
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return this;
+ }
+
+ protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
+ {
+ pipeline.addLast(_inputHandler).addLast(_outputHandler);
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try
+ {
+ if (_channel != null)
+ {
+ _channel.disconnect().sync();
+ _channel.close().sync();
+ _channel = null;
+ }
+ }
+ finally
+ {
+ _workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
+ }
+ }
+
+ public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ ChannelPromise promise = _channel.newPromise();
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(bytes);
+ _channel.write(buffer, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+
+ public ListenableFuture<Void> sendPerformative(final Object data, boolean sync) throws Exception
+ {
+ Preconditions.checkState(_channel != null, "Not connected");
+ if (!sync)
+ {
+ ChannelPromise promise = _channel.newPromise();
+ _channel.write(data, promise);
+ _channel.flush();
+ return JdkFutureAdapters.listenInPoolThread(promise);
+ }
+ else
+ {
+ ChannelFuture channelFuture = _channel.writeAndFlush(data);
+ channelFuture.sync();
+ return Futures.immediateFuture(null);
+ }
+ }
+
+ public <T extends Response<?>> T getNextResponse() throws Exception
+ {
+ return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public void assertNoMoreResponses() throws Exception
+ {
+ Response response = getNextResponse();
+ assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
+ }
+
+ public void assertNoMoreResponsesAndChannelClosed() throws Exception
+ {
+ assertNoMoreResponses();
+ assertThat(_channelClosedSeen, is(true));
+ }
+
+ private static class ChannelClosedResponse implements Response<Void>
+ {
+ @Override
+ public String toString()
+ {
+ return "ChannelClosed";
+ }
+
+ @Override
+ public Void getBody()
+ {
+ return null;
+ }
+ }
+
+ public abstract byte[] getProtocolHeader();
+
+ protected abstract Interaction newInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
new file mode 100644
index 0000000..9767b40
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/HeaderResponse.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+public class HeaderResponse implements Response<byte[]>
+{
+ private final byte[] _header;
+
+ public HeaderResponse(final byte[] header)
+ {
+ _header = header;
+ }
+
+ @Override
+ public byte[] getBody()
+ {
+ return _header;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "HeaderResponse{" +
+ "_header=" + Arrays.toString(_header) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
new file mode 100644
index 0000000..369cfd1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputDecoder.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+public interface InputDecoder
+{
+ Collection<Response<?>> decode(final ByteBuffer inputBuffer) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
new file mode 100644
index 0000000..2d5fb45
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/InputHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+
+ private final BlockingQueue<Response<?>> _responseQueue;
+ private final InputDecoder _inputDecoder;
+
+ private ByteBuffer _inputBuffer = ByteBuffer.allocate(0);
+
+ InputHandler(final BlockingQueue<Response<?>> queue, InputDecoder inputDecoder)
+ {
+ _responseQueue = queue;
+ _inputDecoder = inputDecoder;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+ {
+ ByteBuf buf = (ByteBuf) msg;
+ ByteBuffer byteBuffer = ByteBuffer.allocate(buf.readableBytes());
+ byteBuffer.put(buf.nioBuffer());
+ byteBuffer.flip();
+ LOGGER.debug("Incoming {} byte(s)", byteBuffer.remaining());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ ByteBuffer old = _inputBuffer;
+ _inputBuffer = ByteBuffer.allocate(_inputBuffer.remaining() + byteBuffer.remaining());
+ _inputBuffer.put(old);
+ _inputBuffer.put(byteBuffer);
+ _inputBuffer.flip();
+ }
+ else
+ {
+ _inputBuffer = byteBuffer;
+ }
+
+ _responseQueue.addAll(_inputDecoder.decode(_inputBuffer));
+
+ LOGGER.debug("After parsing, {} byte(s) remained", _inputBuffer.remaining());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ _inputBuffer.compact();
+ _inputBuffer.flip();
+ }
+
+ ReferenceCountUtil.release(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
new file mode 100644
index 0000000..238c0a5
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Interaction.java
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import static com.google.common.util.concurrent.Futures.allAsList;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public abstract class Interaction<I extends Interaction>
+{
+ private final FrameTransport _transport;
+ private ListenableFuture<?> _latestFuture;
+ private Response<?> _latestResponse;
+
+ public Interaction(final FrameTransport frameTransport)
+ {
+ _transport = frameTransport;
+ }
+
+ public I consumeResponse(final Class<?>... responseTypes) throws Exception
+ {
+ sync();
+ _latestResponse = getNextResponse();
+ final Set<Class<?>> acceptableResponseClasses = new HashSet<>(Arrays.asList(responseTypes));
+ if ((acceptableResponseClasses.isEmpty() && _latestResponse != null)
+ || (acceptableResponseClasses.contains(null) && _latestResponse == null))
+ {
+ return getInteraction();
+ }
+ acceptableResponseClasses.remove(null);
+ if (_latestResponse != null)
+ {
+ for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ {
+ if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ return getInteraction();
+ }
+ }
+ }
+ throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
+ acceptableResponseClasses,
+ _latestResponse == null ? null : _latestResponse.getBody()));
+ }
+
+ protected Response<?> getNextResponse() throws Exception
+ {
+ return _transport.getNextResponse();
+ }
+
+ public I sync() throws InterruptedException, ExecutionException, TimeoutException
+ {
+ if (_latestFuture != null)
+ {
+ _latestFuture.get(FrameTransport.RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ _latestFuture = null;
+ }
+ return getInteraction();
+ }
+
+ public Response<?> getLatestResponse() throws Exception
+ {
+ sync();
+ return _latestResponse;
+ }
+
+ public <T> T getLatestResponse(Class<T> type) throws Exception
+ {
+ sync();
+ if (!type.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ throw new IllegalStateException(String.format("Unexpected response. Expected '%s' got '%s'.",
+ type.getSimpleName(),
+ _latestResponse.getBody()));
+ }
+
+ return (T) _latestResponse.getBody();
+ }
+
+ protected ListenableFuture<Void> sendPerformativeAndChainFuture(final Object frameBody, boolean sync) throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendPerformative(frameBody, sync);
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return future;
+ }
+
+ public I negotiateProtocol() throws Exception
+ {
+ final ListenableFuture<Void> future = _transport.sendProtocolHeader(getProtocolHeader());
+ if (_latestFuture != null)
+ {
+ _latestFuture = allAsList(_latestFuture, future);
+ }
+ else
+ {
+ _latestFuture = future;
+ }
+ return getInteraction();
+ }
+
+ protected FrameTransport getTransport()
+ {
+ return _transport;
+ }
+
+ protected abstract byte[] getProtocolHeader();
+
+ protected abstract I getInteraction();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
new file mode 100644
index 0000000..292ae9a
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Matchers.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.util.Arrays;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+
+public class Matchers
+{
+ public static Matcher<Response> protocolHeader(byte[] expectedHeader)
+ {
+ return new BaseMatcher<Response>()
+ {
+ @Override
+ public void describeTo(final Description description)
+ {
+ description.appendValue(new HeaderResponse(expectedHeader));
+ }
+
+ @Override
+ public boolean matches(final Object o)
+ {
+ if (o == null)
+ {
+ return false;
+ }
+ if (!(o instanceof HeaderResponse))
+ {
+ return false;
+ }
+ if (!Arrays.equals(expectedHeader, ((HeaderResponse) o).getBody()))
+ {
+ return false;
+ }
+ return true;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
new file mode 100644
index 0000000..a6a4a47
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputEncoder.java
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+public interface OutputEncoder
+{
+ ByteBuffer encode(Object msg);
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
new file mode 100644
index 0000000..40a2ca7
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/OutputHandler.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol;
+
+import java.nio.ByteBuffer;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+ private final OutputEncoder _outputEncoder;
+
+ OutputHandler(final OutputEncoder outputEncoder)
+ {
+ _outputEncoder = outputEncoder;
+ }
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+ {
+ ByteBuffer byteBuffer = _outputEncoder.encode(msg);
+ if (byteBuffer != null)
+ {
+ send(ctx, byteBuffer, promise);
+ }
+ else
+ {
+ super.write(ctx, msg, promise);
+ }
+ }
+
+ private void send(ChannelHandlerContext ctx, final ByteBuffer dataByteBuffer, final ChannelPromise promise)
+ {
+ byte[] data = new byte[dataByteBuffer.remaining()];
+ dataByteBuffer.get(data);
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(data);
+ try
+ {
+ OutputHandler.super.write(ctx, buffer, promise);
+ }
+ catch (Exception e)
+ {
+ promise.setFailure(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
new file mode 100644
index 0000000..debc06f
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/Response.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+public interface Response<T>
+{
+ T getBody();
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/06e53d7a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
new file mode 100644
index 0000000..db6d7a1
--- /dev/null
+++ b/systests/protocol-tests-core/src/main/java/org/apache/qpid/tests/protocol/SpecificationTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface SpecificationTest
+{
+ String section();
+ String description();
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org