You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/04/07 16:38:57 UTC
svn commit: r1790583 [1/2] - in /qpid/java/trunk: ./
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/
broker-plugins/amqp-1-0...
Author: lquack
Date: Fri Apr 7 16:38:56 2017
New Revision: 1790583
URL: http://svn.apache.org/viewvc?rev=1790583&view=rev
Log:
QPID-7665: [Java Broker] Protocol system test suite
Added:
qpid/java/trunk/systests/protocol-tests-amqp-1-0/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/pom.xml
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
- copied, changed from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdminFactory.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java
- copied, changed from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java
- copied, changed from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ProtocolTestBase.java
- copied, changed from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/QpidTestRunner.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java
- copied, changed from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/resources/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/
qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Symbol.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
qpid/java/trunk/pom.xml
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Fri Apr 7 16:38:56 2017
@@ -63,6 +63,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
@@ -1252,7 +1253,7 @@ public class AMQPConnection_1_0Impl exte
send(new SASLFrame(mechanisms), null);
_frameReceivingState = FrameReceivingState.SASL_INIT_ONLY;
- _frameHandler = new FrameHandler(this, true);
+ _frameHandler = getFrameHandler(true);
}
else if(Arrays.equals(header, AMQP_HEADER))
{
@@ -1278,7 +1279,7 @@ public class AMQPConnection_1_0Impl exte
}
getSender().send(QpidByteBuffer.wrap(AMQP_HEADER));
_frameReceivingState = FrameReceivingState.OPEN_ONLY;
- _frameHandler = new FrameHandler(this, false);
+ _frameHandler = getFrameHandler(false);
}
else
@@ -1292,6 +1293,11 @@ public class AMQPConnection_1_0Impl exte
}
+ private FrameHandler getFrameHandler(final boolean sasl)
+ {
+ return new FrameHandler(new ValueHandler(this.getDescribedTypeRegistry()), this, sasl);
+ }
+
public void closed()
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java Fri Apr 7 16:38:56 2017
@@ -24,6 +24,7 @@ import org.apache.qpid.server.bytebuffer
public interface ProtocolHandler
{
+ // TODO rv is unused
ProtocolHandler parse(QpidByteBuffer in);
boolean isDone();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java Fri Apr 7 16:38:56 2017
@@ -26,7 +26,7 @@ import java.util.Formatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler;
import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
@@ -35,33 +35,23 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
public class FrameHandler implements ProtocolHandler
{
public static Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class);
private final boolean _isSasl;
- private ConnectionHandler _connection;
- private ValueHandler _typeHandler;
+ private final ConnectionHandler _connectionHandler;
+ private final ValueHandler _valueHandler;
private boolean _errored = false;
- public FrameHandler(final ValueHandler typeHandler, final ConnectionHandler connection, final boolean isSasl)
+ public FrameHandler(final ValueHandler valueHandler, final ConnectionHandler connectionHandler, final boolean isSasl)
{
- _typeHandler = typeHandler;
- _connection = connection;
+ _valueHandler = valueHandler;
+ _connectionHandler = connectionHandler;
_isSasl = isSasl;
}
- public FrameHandler(final AMQPConnection_1_0 connection, final boolean sasl)
- {
- this(new ValueHandler(connection.getDescribedTypeRegistry()), connection, sasl);
-
- }
-
-
-
-
public ProtocolHandler parse(QpidByteBuffer in)
{
try
@@ -90,12 +80,12 @@ public class FrameHandler implements Pro
break;
}
- if(size > _connection.getMaxFrameSize())
+ if(size > _connectionHandler.getMaxFrameSize())
{
frameParsingError = createFramingError(
"specified frame size %d larger than maximum frame header size %d",
size,
- _connection.getMaxFrameSize());
+ _connectionHandler.getMaxFrameSize());
break;
}
@@ -156,7 +146,7 @@ public class FrameHandler implements Pro
try
{
- Object val = dup.hasRemaining() ? _typeHandler.parse(dup) : null;
+ Object val = dup.hasRemaining() ? _valueHandler.parse(dup) : null;
if (dup.hasRemaining())
{
@@ -174,7 +164,7 @@ public class FrameHandler implements Pro
val);
}
}
- _connection.receive((short) channel, val);
+ _connectionHandler.receive((short) channel, val);
}
catch (AmqpErrorException ex)
{
@@ -188,7 +178,7 @@ public class FrameHandler implements Pro
if (frameParsingError != null)
{
- _connection.handleError(frameParsingError);
+ _connectionHandler.handleError(frameParsingError);
_errored = true;
}
@@ -197,7 +187,7 @@ public class FrameHandler implements Pro
{
LOGGER.warn("Unexpected exception handling frame", e);
// This exception is unexpected. The up layer should handle error condition gracefully
- _connection.handleError(this.createError(ConnectionError.CONNECTION_FORCED, e.toString()));
+ _connectionHandler.handleError(this.createError(ConnectionError.CONNECTION_FORCED, e.toString()));
}
return this;
}
@@ -220,6 +210,6 @@ public class FrameHandler implements Pro
public boolean isDone()
{
- return _errored|| _connection.closedForInput();
+ return _errored || _connectionHandler.closedForInput();
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java Fri Apr 7 16:38:56 2017
@@ -28,7 +28,7 @@ public final class TransportFrame extend
private final short _channel;
- TransportFrame(short channel, FrameBody frameBody)
+ public TransportFrame(short channel, FrameBody frameBody)
{
super(frameBody);
_channel = channel;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Symbol.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Symbol.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Symbol.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Symbol.java Fri Apr 7 16:38:56 2017
@@ -61,6 +61,23 @@ public final class Symbol implements Com
}
@Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final Symbol symbol = (Symbol) o;
+
+ return _underlying.equals(symbol._underlying);
+ }
+
+ @Override
public int hashCode()
{
return _underlying.hashCode();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedShort.java Fri Apr 7 16:38:56 2017
@@ -27,6 +27,7 @@ public final class UnsignedShort extends
private final short _underlying;
private static final UnsignedShort[] cachedValues = new UnsignedShort[256];
+ public static final UnsignedShort MAX_VALUE = new UnsignedShort((short) 0xffff);
static
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java Fri Apr 7 16:38:56 2017
@@ -70,11 +70,36 @@ public class AmqpError
_val = val;
}
+ @Override
public Symbol getValue()
{
return _val;
}
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final AmqpError amqpError = (AmqpError) o;
+
+ return _val.equals(amqpError._val);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _val.hashCode();
+ }
+
+ @Override
public String toString()
{
Modified: qpid/java/trunk/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/pom.xml?rev=1790583&r1=1790582&r2=1790583&view=diff
==============================================================================
--- qpid/java/trunk/pom.xml (original)
+++ qpid/java/trunk/pom.xml Fri Apr 7 16:38:56 2017
@@ -141,6 +141,8 @@
<!-- test dependency version numbers -->
<junit-version>4.11</junit-version>
<mockito-version>1.9.5</mockito-version>
+ <netty-version>4.1.9.Final</netty-version>
+ <hamcrest-version>1.3</hamcrest-version>
<httpclient-version>4.4</httpclient-version>
<qpid-jms-client-version>0.21.0</qpid-jms-client-version>
<qpid-jms-client-amqp-0-x-version>6.1.2</qpid-jms-client-amqp-0-x-version>
@@ -185,6 +187,7 @@
<module>qpid-test-utils</module>
<module>systests</module>
<module>systests/qpid-systests-jms_2.0</module>
+ <module>systests/protocol-tests-amqp-1-0</module>
<module>perftests</module>
<module>qpid-perftests-systests</module>
@@ -353,6 +356,41 @@
<version>${mockito-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${netty-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${netty-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>${netty-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <version>${hamcrest-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ <version>${hamcrest-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ <version>${hamcrest-version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/pom.xml?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/pom.xml (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/pom.xml Fri Apr 7 16:38:56 2017
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests-parent</artifactId>
+ <version>7.0.0-SNAPSHOT</version>
+ <relativePath>../../qpid-systests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>protocol-tests-amqp-1-0</artifactId>
+ <name>Apache Qpid Protocol Tests for AMQP 1.0</name>
+ <description>Tests of AMQP 1.0</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-codegen</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-link-store</artifactId>
+ <version>${project.version}</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-systests</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-bdbstore</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sleepycat</groupId>
+ <artifactId>je</artifactId>
+ <scope>provided</scope>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-integration</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
Copied: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java (from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java?p2=qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java&r1=1790578&r2=1790583&rev=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java Fri Apr 7 16:38:56 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,15 +15,29 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
-public interface ProtocolHandler
+import org.apache.qpid.server.plugin.Pluggable;
+
+public interface BrokerAdmin extends Pluggable
{
- ProtocolHandler parse(QpidByteBuffer in);
+ void beforeTestClass(final Class testClass);
+ void beforeTestMethod(final Class testClass, final Method method);
+ void afterTestMethod(final Class testClass, final Method method);
+ void afterTestClass(final Class testClass);
+
+ InetSocketAddress getBrokerAddress(PortType portType);
+
+ void createQueue(String queueName);
- boolean isDone();
+ enum PortType
+ {
+ ANONYMOUS_AMQP,
+ AMQP
+ }
}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdminFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdminFactory.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdminFactory.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdminFactory.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2017 by Lorenz Quack
+ *
+ * This file is part of qpid-java-build.
+ *
+ * qpid-java-build is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * qpid-java-build is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with qpid-java-build. If not, see <http://www.gnu.org/licenses/>.
+ */
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.Map;
+
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+
+public class BrokerAdminFactory
+{
+ BrokerAdmin createInstance(String type)
+ {
+ Map<String, BrokerAdmin> adminFacades = new QpidServiceLoader().getInstancesByType(BrokerAdmin.class);
+ BrokerAdmin brokerAdmin = adminFacades.get(type);
+ if (brokerAdmin == null)
+ {
+ throw new RuntimeException(String.format("Could not find BrokerAdmin implementation of type '%s'", type));
+ }
+ return brokerAdmin;
+ }
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.SystemLauncher;
+import org.apache.qpid.server.SystemLauncherListener;
+import org.apache.qpid.server.logging.logback.LogbackLoggingSystemLauncherListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Container;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.SystemConfig;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.store.MemoryConfigurationStore;
+import org.apache.qpid.server.virtualhostnode.JsonVirtualHostNode;
+
+@PluggableService
+public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedBrokerPerClassAdminImpl.class);
+ private final Map<String, Integer> _ports = new HashMap<>();
+ private SystemLauncher _systemLauncher;
+ private Container<?> _broker;
+ private VirtualHostNode<?> _currentVirtualHostNode;
+
+ @Override
+ public void beforeTestClass(final Class testClass)
+ {
+ LOGGER.info("beforeTestClass " + testClass.getSimpleName());
+ try
+ {
+ Path qpidWorkDir = Files.createTempDirectory("qpid-work-");
+
+ Map<String,String> context = new HashMap<>();
+ context.put("qpid.work_dir", qpidWorkDir.toString());
+ context.put("qpid.port.protocol_handshake_timeout", "1000000");
+
+ Map<String,Object> systemConfigAttributes = new HashMap<>();
+ systemConfigAttributes.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:config-protocol-tests.json");
+ systemConfigAttributes.put(ConfiguredObject.CONTEXT, context);
+ systemConfigAttributes.put(ConfiguredObject.TYPE, System.getProperty("broker.config-store-type", "JSON"));
+
+ if (Thread.getDefaultUncaughtExceptionHandler() == null)
+ {
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler());
+ }
+
+ LOGGER.info("Starting internal broker (same JVM)");
+
+ List<SystemLauncherListener> systemLauncherListeners = new ArrayList<>();
+ systemLauncherListeners.add(new LogbackLoggingSystemLauncherListener());
+ systemLauncherListeners.add(new ShutdownLoggingSystemLauncherListener());
+ systemLauncherListeners.add(new PortExtractingLauncherListener());
+ _systemLauncher = new SystemLauncher(systemLauncherListeners.toArray(new SystemLauncherListener[systemLauncherListeners.size()]));
+
+ _systemLauncher.startup(systemConfigAttributes);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to start broker for test class", e);
+ }
+ }
+
+ @Override
+ public void beforeTestMethod(final Class testClass, final Method method)
+ {
+ LOGGER.info("beforeTestMethod " + testClass.getSimpleName() + "#" + method.getName());
+
+ final String virtualHostNodeName = testClass.getSimpleName() + "_" + method.getName();
+ final String storeType = System.getProperty("virtualhostnode.type");
+
+ String storeDir = null;
+ if (System.getProperty("profile", "").startsWith("java-dby-mem"))
+ {
+ storeDir = ":memory:";
+ }
+ else if (!MemoryConfigurationStore.TYPE.equals(storeType))
+ {
+ storeDir = "${qpid.work_dir}" + File.separator + virtualHostNodeName;
+ }
+
+ String blueprint = System.getProperty("virtualhostnode.context.blueprint");
+
+ Map<String, Object> attributes = new HashMap<>();
+ attributes.put(VirtualHostNode.NAME, virtualHostNodeName);
+ attributes.put(VirtualHostNode.TYPE, storeType);
+ attributes.put(VirtualHostNode.CONTEXT, Collections.singletonMap("virtualhostBlueprint", blueprint));
+ attributes.put(VirtualHostNode.DEFAULT_VIRTUAL_HOST_NODE, true);
+ attributes.put(VirtualHostNode.VIRTUALHOST_INITIAL_CONFIGURATION, blueprint);
+ if (storeDir != null)
+ {
+ attributes.put(JsonVirtualHostNode.STORE_PATH, storeDir);
+ }
+
+ _currentVirtualHostNode = _broker.createChild(VirtualHostNode.class, attributes);
+ }
+
+ @Override
+ public void afterTestMethod(final Class testClass, final Method method)
+ {
+ LOGGER.info("afterTestMethod " + testClass.getSimpleName() + "#" + method.getName());
+ if (!Boolean.getBoolean("qpid.tests.protocols.keepVirtualHosts"))
+ {
+ _currentVirtualHostNode.delete();
+ }
+ }
+
+ @Override
+ public void afterTestClass(final Class testClass)
+ {
+ LOGGER.info("afterTestClass " + testClass.getSimpleName());
+ _systemLauncher.shutdown();
+ _ports.clear();
+ }
+
+ @Override
+ public InetSocketAddress getBrokerAddress(final PortType portType)
+ {
+ Integer port = _ports.get(portType.name());
+ if (port == null)
+ {
+ throw new IllegalStateException(String.format("Could not find port with name '%s' on the Broker", portType.name()));
+ }
+ return new InetSocketAddress(port);
+ }
+
+ @Override
+ public void createQueue(final String queueName)
+ {
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Queue.NAME, queueName);
+ attributes.put(Queue.TYPE, "standard");
+ _currentVirtualHostNode.getVirtualHost().createChild(Queue.class, attributes);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "EMBEDDED_BROKER_PER_CLASS";
+ }
+
+ private class PortExtractingLauncherListener implements SystemLauncherListener
+ {
+ private SystemConfig<?> _systemConfig;
+
+ @Override
+ public void beforeStartup()
+ {
+
+ }
+
+ @Override
+ public void errorOnStartup(final RuntimeException e)
+ {
+
+ }
+
+ @Override
+ public void afterStartup()
+ {
+
+ if (_systemConfig == null)
+ {
+ throw new IllegalStateException("System config is required");
+ }
+
+ _broker = _systemConfig.getContainer();
+ Collection<Port> ports = _broker.getChildren(Port.class);
+ for (Port port : ports)
+ {
+ _ports.put(port.getName(), port.getBoundPort());
+ }
+ }
+
+ @Override
+ public void onContainerResolve(final SystemConfig<?> systemConfig)
+ {
+ _systemConfig = systemConfig;
+ }
+
+ @Override
+ public void onContainerClose(final SystemConfig<?> systemConfig)
+ {
+
+ }
+
+ @Override
+ public void onShutdown(final int exitCode)
+ {
+
+ }
+
+ @Override
+ public void exceptionOnShutdown(final Exception e)
+ {
+
+ }
+ }
+
+
+ private static class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler
+ {
+ private final AtomicInteger _count = new AtomicInteger(0);
+
+ @Override
+ public void uncaughtException(final Thread t, final Throwable e)
+ {
+ System.err.print("Thread terminated due to uncaught exception");
+ e.printStackTrace();
+
+ LOGGER.error("Uncaught exception from thread {}", t.getName(), e);
+ _count.getAndIncrement();
+ }
+
+ public int getAndResetCount()
+ {
+ int count;
+ do
+ {
+ count = _count.get();
+ }
+ while (!_count.compareAndSet(count, 0));
+ return count;
+ }
+ }
+
+ private class ShutdownLoggingSystemLauncherListener extends SystemLauncherListener.DefaultSystemLauncherListener
+ {
+ @Override
+ public void onShutdown(final int exitCode)
+ {
+ _systemLauncher = null;
+ }
+
+ @Override
+ public void exceptionOnShutdown(final Exception e)
+ {
+ if (e instanceof IllegalStateException
+ || e instanceof IllegalStateTransitionException)
+ {
+ System.out.println(
+ "IllegalStateException occurred on broker shutdown in test ");
+ }
+ }
+ }
+
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExternalQpidBrokerAdminImpl.class);
+
+ public ExternalQpidBrokerAdminImpl()
+ {
+ LOGGER.debug("QpidBrokerAdminImpl ctor");
+ }
+
+ @Override
+ public void beforeTestClass(final Class testClass)
+ {
+ LOGGER.debug("beforeTestClass");
+ }
+
+ @Override
+ public void beforeTestMethod(final Class testClass, final Method method)
+ {
+ LOGGER.debug("beforeTestMethod");
+ }
+
+ @Override
+ public void afterTestMethod(final Class testClass, final Method method)
+ {
+ LOGGER.debug("afterTestMethod");
+ }
+
+ @Override
+ public void afterTestClass(final Class testClass)
+ {
+ LOGGER.debug("afterTestClass");
+ }
+
+ @Override
+ public InetSocketAddress getBrokerAddress(final PortType portType)
+ {
+ Integer port;
+ switch (portType)
+ {
+ case AMQP:
+ port = Integer.getInteger("qpid.tests.protocol.broker.external.port.standard");
+ break;
+ case ANONYMOUS_AMQP:
+ port = Integer.getInteger("qpid.tests.protocol.broker.external.port.anonymous");
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown port type '%s'", portType));
+ }
+ return new InetSocketAddress(port);
+ }
+
+ @Override
+ public void createQueue(final String queueName)
+ {
+ LOGGER.debug(String.format("creation of queue '%s' requested", queueName));
+ }
+
+ @Override
+ public String getType()
+ {
+ return "EXTERNAL_BROKER";
+ }
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import static org.junit.Assert.assertNull;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.util.concurrent.JdkFutureAdapters;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+
+public class FrameTransport implements AutoCloseable
+{
+ private static final long RESPONSE_TIMEOUT = 6000;
+ private static final AtomicInteger _amqpConnectionIdGenerator = new AtomicInteger(1);
+ private static final AtomicInteger _amqpChannelIdGenerator = new AtomicInteger(1);
+
+ private final Channel _channel;
+ private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100);
+ private final EventLoopGroup _workerGroup;
+
+ private int _amqpConnectionId;
+ private short _amqpChannelId;
+
+ public FrameTransport(final InetSocketAddress brokerAddress)
+ {
+ _workerGroup = new NioEventLoopGroup();
+
+ try
+ {
+ Bootstrap b = new Bootstrap();
+ b.group(_workerGroup);
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.handler(new ChannelInitializer<SocketChannel>()
+ {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception
+ {
+ ch.pipeline().addLast(new InputHandler(_queue))
+ .addLast(new OutputHandler());
+ }
+ });
+
+ _channel = b.connect(brokerAddress).sync().channel();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception
+ {
+ try
+ {
+ _channel.close().sync();
+ }
+ finally
+ {
+ _workerGroup.shutdownGracefully();
+ }
+ }
+
+ public ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
+ {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(bytes);
+ ChannelFuture channelFuture = _channel.writeAndFlush(buffer);
+ channelFuture.sync();
+ return JdkFutureAdapters.listenInPoolThread(channelFuture);
+ }
+
+ public ListenableFuture<Void> sendPerformative(final TransportFrame transportFrame) throws Exception
+ {
+ ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame);
+ channelFuture.sync();
+ return JdkFutureAdapters.listenInPoolThread(channelFuture);
+ }
+
+ public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception
+ {
+ TransportFrame transportFrame = new TransportFrame(_amqpChannelId, frameBody);
+ return sendPerformative(transportFrame);
+ }
+
+ public Response getNextResponse() throws Exception
+ {
+ return _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
+ public void doProtocolNegotiation() throws Exception
+ {
+ byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+ sendProtocolHeader(bytes);
+ HeaderResponse response = (HeaderResponse) getNextResponse();
+
+ if (!Arrays.equals(bytes, response.getHeader()))
+ {
+ throw new IllegalStateException("Unexpected protocol header");
+ }
+ }
+
+ public void doOpenConnection() throws Exception
+ {
+ doProtocolNegotiation();
+ Open open = new Open();
+ _amqpConnectionId = _amqpConnectionIdGenerator.getAndIncrement();
+ open.setContainerId(String.format("testContainer-%d", _amqpConnectionId));
+ TransportFrame transportFrame = new TransportFrame((short) 0, open);
+ sendPerformative(transportFrame);
+ PerformativeResponse response = (PerformativeResponse) getNextResponse();
+ if (!(response.getFrameBody() instanceof Open))
+ {
+ throw new IllegalStateException("Unexpected response to connection Open");
+ }
+ }
+
+ public void doBeginSession() throws Exception
+ {
+ doOpenConnection();
+ Begin begin = new Begin();
+ begin.setNextOutgoingId(UnsignedInteger.ZERO);
+ begin.setIncomingWindow(UnsignedInteger.ZERO);
+ begin.setOutgoingWindow(UnsignedInteger.ZERO);
+ _amqpChannelId = (short) _amqpChannelIdGenerator.getAndIncrement();
+ sendPerformative(new TransportFrame(_amqpChannelId, begin));
+ PerformativeResponse response = (PerformativeResponse) getNextResponse();
+ if (!(response.getFrameBody() instanceof Begin))
+ {
+ throw new IllegalStateException("Unexpected response to connection Begin");
+ }
+ }
+
+ public void assertNoMoreResponses() throws Exception
+ {
+ Response response = getNextResponse();
+ assertNull("Unexpected response.", response);
+ }
+}
Copied: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java (from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java?p2=qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java&r1=1790578&r2=1790583&rev=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/HeaderResponse.java Fri Apr 7 16:38:56 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,15 +15,31 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.Arrays;
-public interface ProtocolHandler
+public class HeaderResponse implements Response
{
- ProtocolHandler parse(QpidByteBuffer in);
+ private final byte[] _header;
+
+ public HeaderResponse(final byte[] header)
+ {
+ _header = header;
+ }
+
+ public byte[] getHeader()
+ {
+ return _header;
+ }
- boolean isDone();
+ @Override
+ public String toString()
+ {
+ return "HeaderResponse{" +
+ "_header=" + Arrays.toString(_header) +
+ '}';
+ }
}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import java.util.concurrent.BlockingQueue;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.util.ReferenceCountUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome;
+import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+
+public class InputHandler extends ChannelInboundHandlerAdapter
+{
+ private enum ParsingState
+ {
+ HEADER,
+ PERFORMATIVES
+ };
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
+ private final ValueHandler _valueHandler;
+ private final FrameHandler _frameHandler;
+
+ private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0);
+ private BlockingQueue<Response> _responseQueue;
+ private ParsingState _state = ParsingState.HEADER;
+
+ public InputHandler(final BlockingQueue<Response> queue)
+ {
+
+ _valueHandler = new ValueHandler(TYPE_REGISTRY);
+ _frameHandler = new FrameHandler(_valueHandler, new MyConnectionHandler(), false);
+
+ _responseQueue = queue;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception
+ {
+ // TODO does Netty take care of saving the remaining bytes???
+ ByteBuf buf = (ByteBuf) msg;
+ QpidByteBuffer qpidBuf = QpidByteBuffer.wrap(buf.nioBuffer());
+
+ if (_inputBuffer.hasRemaining())
+ {
+ QpidByteBuffer old = _inputBuffer;
+ _inputBuffer = QpidByteBuffer.allocate(_inputBuffer.remaining() + qpidBuf.remaining());
+ _inputBuffer.put(old);
+ _inputBuffer.put(qpidBuf);
+ old.dispose();
+ qpidBuf.dispose();
+ _inputBuffer.flip();
+ }
+ else
+ {
+ _inputBuffer.dispose();
+ _inputBuffer = qpidBuf;
+ }
+
+ doParsing();
+
+ if (_inputBuffer.hasRemaining())
+ {
+ _inputBuffer.compact();
+ }
+
+ ReferenceCountUtil.release(msg);
+ }
+
+ private void doParsing()
+ {
+ switch(_state)
+ {
+ case HEADER:
+ if (_inputBuffer.remaining() >= 8)
+ {
+ byte[] header = new byte[8];
+ _inputBuffer.get(header);
+ _responseQueue.add(new HeaderResponse(header));
+ _state = ParsingState.PERFORMATIVES;
+ doParsing();
+ }
+ break;
+ case PERFORMATIVES:
+ _frameHandler.parse(_inputBuffer);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state : " + _state);
+ }
+ }
+
+ @Override
+ public void channelReadComplete(final ChannelHandlerContext ctx) throws Exception
+ {
+ LOGGER.debug("KWDEBUG channelReadComplete");
+ super.channelReadComplete(ctx);
+ }
+
+ private class MyConnectionHandler implements ConnectionHandler
+ {
+ @Override
+ public void receiveOpen(final short channel, final Open close)
+ {
+ System.out.println();
+ }
+
+ @Override
+ public void receiveClose(final short channel, final Close close)
+ {
+
+ }
+
+ @Override
+ public void receiveBegin(final short channel, final Begin begin)
+ {
+
+ }
+
+ @Override
+ public void receiveEnd(final short channel, final End end)
+ {
+
+ }
+
+ @Override
+ public void receiveAttach(final short channel, final Attach attach)
+ {
+
+ }
+
+ @Override
+ public void receiveDetach(final short channel, final Detach detach)
+ {
+
+ }
+
+ @Override
+ public void receiveTransfer(final short channel, final Transfer transfer)
+ {
+
+ }
+
+ @Override
+ public void receiveDisposition(final short channel, final Disposition disposition)
+ {
+
+ }
+
+ @Override
+ public void receiveFlow(final short channel, final Flow flow)
+ {
+
+ }
+
+ @Override
+ public int getMaxFrameSize()
+ {
+ return 512;
+ }
+
+ @Override
+ public void handleError(final Error parsingError)
+ {
+
+ }
+
+ @Override
+ public boolean closedForInput()
+ {
+ return false;
+ }
+
+ @Override
+ public void receive(final short channel, final Object val)
+ {
+ Response response;
+ if (val instanceof FrameBody)
+ {
+ response = new PerformativeResponse(channel, (FrameBody) val);
+ }
+ else if (val instanceof SaslFrameBody)
+ {
+ throw new UnsupportedOperationException("TODO: ");
+ }
+ else
+ {
+ throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass());
+ }
+ _responseQueue.add(response);
+
+ }
+
+ @Override
+ public void receiveSaslInit(final SaslInit saslInit)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslChallenge(final SaslChallenge saslChallenge)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslResponse(final SaslResponse saslResponse)
+ {
+
+ }
+
+ @Override
+ public void receiveSaslOutcome(final SaslOutcome saslOutcome)
+ {
+
+ }
+ }
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame;
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.transport.ByteBufferSender;
+
+public class OutputHandler extends ChannelOutboundHandlerAdapter
+{
+ private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
+
+
+ @Override
+ public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception
+ {
+
+ if (msg instanceof TransportFrame)
+ {
+ FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender()
+ {
+ @Override
+ public boolean isDirectBufferPreferred()
+ {
+ return false;
+ }
+
+ @Override
+ public void send(final QpidByteBuffer msg)
+ {
+ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
+ buffer.writeBytes(msg.asByteBuffer());
+ try
+ {
+ OutputHandler.super.write(ctx, buffer, promise);
+ }
+ catch (Exception e)
+ {
+ promise.setFailure(e);
+ }
+ }
+
+ @Override
+ public void flush()
+ {
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+ });
+ _frameWriter.send((TransportFrame) msg);
+ }
+ else
+ {
+ super.write(ctx, msg, promise);
+ }
+ }
+
+
+}
Copied: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java (from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java?p2=qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java&r1=1790578&r2=1790583&rev=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/TransportFrame.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/PerformativeResponse.java Fri Apr 7 16:38:56 2017
@@ -16,37 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.qpid.server.protocol.v1_0.framing;
-import java.util.List;
+package org.apache.qpid.tests.protocol.v1_0;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
-public final class TransportFrame extends AMQFrame<FrameBody>
+public class PerformativeResponse implements Response
{
+ private final short _channelId;
+ private final FrameBody _frameBody;
- private final short _channel;
-
- TransportFrame(short channel, FrameBody frameBody)
+ public PerformativeResponse(final short channelId,
+ final FrameBody frameBody)
{
- super(frameBody);
- _channel = channel;
+ _channelId = channelId;
+ _frameBody = frameBody;
}
- public TransportFrame(short channel, FrameBody frameBody, List<QpidByteBuffer> payload)
+ public FrameBody getFrameBody()
{
- super(frameBody, payload);
- _channel = channel;
+ return _frameBody;
}
- @Override public short getChannel()
+ public short getChannelId()
{
- return _channel;
+ return _channelId;
}
- @Override public byte getFrameType()
+ @Override
+ public String toString()
{
- return (byte)0;
+ return "PerformativeResponse{" +
+ "_channelId=" + _channelId +
+ ", _frameBody=" + _frameBody +
+ '}';
}
}
Copied: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ProtocolTestBase.java (from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ProtocolTestBase.java?p2=qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ProtocolTestBase.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java&r1=1790578&r2=1790583&rev=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ProtocolTestBase.java Fri Apr 7 16:38:56 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,15 +15,28 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public interface ProtocolHandler
+@RunWith(QpidTestRunner.class)
+public abstract class ProtocolTestBase
{
- ProtocolHandler parse(QpidByteBuffer in);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolTestBase.class);
+
+ private BrokerAdmin _brokerAdmin;
+
+ public void init(final BrokerAdmin brokerAdmin)
+ {
+ _brokerAdmin = brokerAdmin;
+ }
- boolean isDone();
+ public BrokerAdmin getBrokerAdmin()
+ {
+ return _brokerAdmin;
+ }
}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/QpidTestRunner.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/QpidTestRunner.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/QpidTestRunner.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/QpidTestRunner.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.InitializationError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QpidTestRunner extends BlockJUnit4ClassRunner
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(QpidTestRunner.class);
+
+ private final BrokerAdmin _brokerAdmin;
+ private final Class _testClass;
+
+ public QpidTestRunner(final Class<?> klass) throws InitializationError
+ {
+ super(klass);
+ _testClass = klass;
+ _brokerAdmin = (new BrokerAdminFactory()).createInstance("EMBEDDED_BROKER_PER_CLASS");
+
+ LOGGER.debug("Runner ctor " + klass.getSimpleName());
+ }
+
+ @Override
+ protected Object createTest() throws Exception
+ {
+ Object test = super.createTest();
+ ProtocolTestBase qpidTest = ((ProtocolTestBase) test);
+ qpidTest.init(_brokerAdmin);
+ return test;
+ }
+
+ @Override
+ public void run(final RunNotifier notifier)
+ {
+ _brokerAdmin.beforeTestClass(_testClass);
+ try
+ {
+ super.run(notifier);
+ }
+ finally
+ {
+ _brokerAdmin.afterTestClass(_testClass);
+ }
+ }
+
+ @Override
+ protected void runChild(final FrameworkMethod method, final RunNotifier notifier)
+ {
+ _brokerAdmin.beforeTestMethod(_testClass, method.getMethod());
+ try
+ {
+ super.runChild(method, notifier);
+ }
+ finally
+ {
+ _brokerAdmin.afterTestMethod(_testClass, method.getMethod());
+ }
+ }
+}
Copied: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java (from r1790578, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java?p2=qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java&r1=1790578&r2=1790583&rev=1790583&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ProtocolHandler.java (original)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Response.java Fri Apr 7 16:38:56 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,15 +15,10 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.qpid.server.protocol.v1_0.codec;
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+package org.apache.qpid.tests.protocol.v1_0;
-public interface ProtocolHandler
+public interface Response
{
- ProtocolHandler parse(QpidByteBuffer in);
-
- boolean isDone();
}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json Fri Apr 7 16:38:56 2017
@@ -0,0 +1,85 @@
+{
+ "name" : "${broker.name}",
+ "modelVersion" : "7.0",
+ "authenticationproviders" : [ {
+ "name" : "anon",
+ "type" : "Anonymous"
+ }, {
+ "name" : "plain",
+ "type" : "Plain",
+ "users" : [ {
+ "name" : "admin",
+ "type" : "managed",
+ "password" : "admin"
+ }, {
+ "name" : "guest",
+ "type" : "managed",
+ "password" : "guest"
+ } ]
+ } ],
+ "brokerloggers" : [ {
+ "name" : "logfile",
+ "type" : "File",
+ "fileName" : "${qpid.work_dir}${file.separator}log${file.separator}qpid.log",
+ "brokerloginclusionrules" : [ {
+ "name" : "Operational",
+ "type" : "NameAndLevel",
+ "level" : "INFO",
+ "loggerName" : "qpid.message.*"
+ }, {
+ "name" : "Qpid",
+ "type" : "NameAndLevel",
+ "durable" : true,
+ "level" : "DEBUG",
+ "loggerName" : "org.apache.qpid.*"
+ }, {
+ "name" : "Root",
+ "type" : "NameAndLevel",
+ "level" : "WARN",
+ "loggerName" : "ROOT"
+ } ]
+ } ],
+ "ports" : [ {
+ "name" : "AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "plain",
+ "port" : "0",
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias"
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias"
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias"
+ } ]
+ }, {
+ "name" : "ANONYMOUS_AMQP",
+ "type" : "AMQP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "AMQP_1_0" ],
+ "virtualhostaliases" : [ {
+ "name" : "defaultAlias",
+ "type" : "defaultAlias",
+ "durable" : true
+ }, {
+ "name" : "hostnameAlias",
+ "type" : "hostnameAlias",
+ "durable" : true
+ }, {
+ "name" : "nameAlias",
+ "type" : "nameAlias",
+ "durable" : true
+ } ]
+ }, {
+ "name" : "HTTP",
+ "type" : "HTTP",
+ "authenticationProvider" : "anon",
+ "port" : "0",
+ "protocols" : [ "HTTP" ]
+ } ],
+ "virtualhostnodes" : []
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport;
+
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.HeaderResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+
+
+/*
+
+TODO
+
+logging - log per test?
+protocol assertions
+admin factory
+performative test
+embedded broker per test admin impl that creates broker per test
+embedded broker per class admin impl creates/destroys vhost per test
+queue creation?
+ */
+
+
+public class ProtocolHeaderTest extends ProtocolTestBase
+{
+ @Test
+ public void successfulHeaderExchange() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try(FrameTransport transport = new FrameTransport(addr))
+ {
+ byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8);
+ transport.sendProtocolHeader(bytes);
+ HeaderResponse response = (HeaderResponse) transport.getNextResponse();
+ assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader());
+ }
+
+ }
+}
Added: qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java?rev=1790583&view=auto
==============================================================================
--- qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java (added)
+++ qpid/java/trunk/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java Fri Apr 7 16:38:56 2017
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.connection;
+
+import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+
+
+public class OpenTest extends ProtocolTestBase
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(OpenTest.class);
+
+ @Test
+ public void emptyOpen() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doProtocolNegotiation();
+ Open open = new Open();
+ TransportFrame transportFrame = new TransportFrame((short) 0, open);
+ transport.sendPerformative(transportFrame);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Close.class)));
+ Close responseClose = (Close) response.getFrameBody();
+ assertThat(responseClose.getError(), is(notNullValue()));
+ assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ }
+ }
+
+ @Test
+ public void successfulOpen() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doProtocolNegotiation();
+ Open open = new Open();
+ open.setContainerId("testContainerId");
+ TransportFrame transportFrame = new TransportFrame((short) 0, open);
+ transport.sendPerformative(transportFrame);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Open.class)));
+ Open responseOpen = (Open) response.getFrameBody();
+ assertThat(responseOpen.getContainerId(), is(notNullValue()));
+ assertThat(responseOpen.getMaxFrameSize().longValue(),
+ is(both(greaterThanOrEqualTo(0L)).and(lessThan(UnsignedInteger.MAX_VALUE.longValue()))));
+ assertThat(responseOpen.getChannelMax().intValue(),
+ is(both(greaterThanOrEqualTo(0)).and(lessThan(UnsignedShort.MAX_VALUE.intValue()))));
+ transport.assertNoMoreResponses();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org