You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/12 16:11:48 UTC

[3/3] qpid-broker-j git commit: QPID-8038: [Broker-J] Add AMQP 1.0 idle and protocol header timeout protocol tests, eliminating the old systems tests built using pieces of the old client

QPID-8038: [Broker-J] Add AMQP 1.0 idle and protocol header timeout protocol tests, eliminating the old systems tests built using pieces of the old client


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/91eb1cb2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/91eb1cb2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/91eb1cb2

Branch: refs/heads/master
Commit: 91eb1cb224e3cbff8646aa81e08114f01fa1e936
Parents: cc8b3c0
Author: Keith Wall <kw...@apache.org>
Authored: Mon Feb 12 13:54:58 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Feb 12 13:55:49 2018 +0000

----------------------------------------------------------------------
 .../ProtocolHeaderTimeoutTest.java              |  86 ++++
 .../transport/connection/IdleTimeoutTest.java   | 100 +++++
 .../qpid/transport/ProtocolNegotiationTest.java | 401 -------------------
 .../utils/EmbeddedBrokerPerClassAdminImpl.java  |  15 +-
 test-profiles/CPPExcludes                       |   4 -
 test-profiles/Java10Excludes                    |   3 -
 6 files changed, 196 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
new file mode 100644
index 0000000..6402ad0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.protocoltimeout;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assume.assumeThat;
+
+import java.lang.reflect.Array;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import com.google.common.io.CharStreams;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@ConfigItem(name = AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT, value = "500")
+public class ProtocolHeaderTimeoutTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    public void noProtocolHeader() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+
+    @Test
+    public void incompleteProtocolHeader() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+
+            final byte[] protocolHeader = transport.getProtocolHeader();
+            byte[] buf = new byte[1];
+            for(int i = 0 ; i < (protocolHeader.length - 1 ); i++)
+            {
+                System.arraycopy(protocolHeader, i, buf, 0, 1);
+                transport.sendBytes(buf);
+                transport.flush();
+            }
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java
new file mode 100644
index 0000000..bcfa4a5
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.connection;
+
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.EmptyResponse;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
+@ConfigItem(name = AmqpPort.HEART_BEAT_DELAY, value = IdleTimeoutTest.IDLE_SECONDS)
+public class IdleTimeoutTest extends BrokerAdminUsingTestBase
+{
+    static final String IDLE_SECONDS = "1";
+    private static final int IDLE_TIMEOUT_MS = Integer.parseInt(IDLE_SECONDS) * 1000;
+
+    @Test
+    @SpecificationTest(section = "2.4.5",
+            description = "If the [idle timeout threshold] threshold is exceeded, then a peer SHOULD try to"
+                          + "gracefully close the connection using a close frame with an error explaining why.")
+    public void brokerClosesIdleConnection() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            Open responseOpen = interaction
+                    .negotiateProtocol().consumeResponse()
+                    .openContainerId("testContainerId")
+                    .open().consumeResponse()
+                    .getLatestResponse(Open.class);
+            assertThat(responseOpen.getIdleTimeOut().intValue(), is(equalTo(IDLE_TIMEOUT_MS)));
+
+            // TODO: defect - broker ought to be sending a close performative but it just closes the socket.
+            interaction.consumeResponse().getLatestResponse(ChannelClosedResponse.class);
+        }
+    }
+    @Test
+    @SpecificationTest(section = "2.4.5",
+            description = "If a peer needs to satisfy the need to send traffic to prevent idle timeout, and has "
+                          + "nothing to send, it MAY send an empty frame.")
+    public void idleLine() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            Open responseOpen = interaction
+                    .negotiateProtocol().consumeResponse()
+                    .openContainerId("testContainerId")
+                    .openIdleTimeOut(IDLE_TIMEOUT_MS)
+                    .open().consumeResponse()
+                    .getLatestResponse(Open.class);
+            assertThat(responseOpen.getIdleTimeOut().intValue(), is(equalTo(IDLE_TIMEOUT_MS)));
+
+            // Reflect the broker's empty frames
+            interaction.consumeResponse(EmptyResponse.class)
+                       .emptyFrame();
+
+            interaction.consumeResponse(EmptyResponse.class)
+                       .emptyFrame();
+
+            interaction.doCloseConnection();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java b/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
deleted file mode 100644
index f25bdd6..0000000
--- a/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody;
-import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation;
-import org.apache.qpid.server.protocol.ProtocolVersion;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.v0_10.ServerDisassembler;
-import org.apache.qpid.server.util.SystemUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-import org.apache.qpid.server.protocol.v0_10.transport.Frame;
-import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class ProtocolNegotiationTest extends QpidBrokerTestCase
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolNegotiationTest.class);
-    private static final int SO_TIMEOUT = 5000;
-    public static final int AMQP_HEADER_LEN = 8;
-    private Protocol _expectedProtocolInit;
-
-    private static final Map<Protocol, List<List<Byte>>> HEADERS = new HashMap<>();
-
-    static
-    {
-        HEADERS.put(Protocol.AMQP_0_8,   Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)8, (byte)0)));
-        HEADERS.put(Protocol.AMQP_0_9,   Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)0, (byte)9)));
-        HEADERS.put(Protocol.AMQP_0_9_1, Collections.singletonList(Arrays.asList((byte)0, (byte)0, (byte)9, (byte)1)));
-        HEADERS.put(Protocol.AMQP_0_10,  Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)0, (byte)10)));
-        HEADERS.put(Protocol.AMQP_1_0,   Arrays.asList(Arrays.asList((byte)3, (byte)1, (byte)0, (byte)0),
-                                                       Arrays.asList((byte)0, (byte)1, (byte)0, (byte)0)));
-
-    }
-
-    @Override
-    public void setUp() throws Exception
-    {
-        // restrict broker to support only single protocol
-        TestBrokerConfiguration config = getDefaultBrokerConfiguration();
-        config.setObjectAttribute(Port.class,
-                                  TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
-                                  Port.PROTOCOLS,
-                                  Arrays.asList(getBrokerProtocol()));
-        Map<String,String> overriddenPortContext = new HashMap<>();
-        overriddenPortContext.put(AmqpPort.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY, null);
-        overriddenPortContext.put(AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT, String.valueOf(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT));
-        config.setObjectAttribute(Port.class,
-                                  TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
-                                  Port.CONTEXT,
-                                  overriddenPortContext);
-        config.setBrokerAttribute(Broker.CONTEXT,
-                                  Collections.singletonMap(AmqpPort.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY, null));
-
-        super.setUp();
-        _expectedProtocolInit = getBrokerProtocol();
-    }
-
-    public void testWrongProtocolHeaderSent_BrokerRespondsWithSupportedProtocol() throws Exception
-    {
-        try(Socket socket = new Socket())
-        {
-            socket.setSoTimeout(SO_TIMEOUT);
-
-            final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
-            LOGGER.debug("Making connection to {}", inetSocketAddress);
-
-            socket.connect(inetSocketAddress);
-
-            assertTrue("Expected socket to be connected", socket.isConnected());
-
-            socket.getOutputStream().write("NOTANAMQPHEADER".getBytes());
-            byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
-            int len = socket.getInputStream().read(receivedHeader);
-            assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len);
-            assertEquals("Expected end-of-stream from socket signifying socket closed)",
-                         -1,
-                         socket.getInputStream().read());
-
-            ProtocolInitiation protocolInitiation = new ProtocolInitiation(QpidByteBuffer.wrap(receivedHeader));
-
-            assertTrue("Unexpected protocol initialisation", matchedExpectedVersion(receivedHeader));
-        }
-    }
-
-    private boolean matchedExpectedVersion(byte[] header)
-    {
-        if(header[0] != 'A' || header[1] != 'M' || header[2] != 'Q' || header[3] != 'P')
-        {
-            return false;
-        }
-        List<Byte> version = new ArrayList<>();
-        for(int i = 4; i<8; i++)
-        {
-            version.add(header[i]);
-        }
-        return HEADERS.get(_expectedProtocolInit).contains(version);
-    }
-
-    public void testNoProtocolHeaderSent_BrokerClosesConnection() throws Exception
-    {
-        try(Socket socket = new Socket())
-        {
-            socket.setSoTimeout(SO_TIMEOUT);
-
-            final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
-            LOGGER.debug("Making connection to {}", inetSocketAddress);
-
-            socket.connect(inetSocketAddress);
-
-            assertTrue("Expected socket to be connected", socket.isConnected());
-
-            int c = 0;
-            try
-            {
-                c = socket.getInputStream().read();
-                LOGGER.debug("Read {}", c);
-
-            }
-            catch(SocketTimeoutException ste)
-            {
-                fail("Broker did not close connection with no activity within expected timeout");
-            }
-
-            assertEquals("Expected end-of-stream from socket signifying socket closed)", -1, c);
-        }
-    }
-
-    public void testNoConnectionOpenSent_BrokerClosesConnection() throws Exception
-    {
-        setSystemProperty(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY, "1000");
-
-        try(Socket socket = new Socket())
-        {
-            socket.setSoTimeout(5000);
-
-            byte[] header = getHeaderBytesForBrokerVersion();
-
-            final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
-            LOGGER.debug("Making connection to {}", inetSocketAddress);
-
-            socket.connect(inetSocketAddress);
-
-            assertTrue("Expected socket to be connected", socket.isConnected());
-
-            OutputStream outputStream = socket.getOutputStream();
-            final TestSender sender = new TestSender(outputStream);
-            final InputStream inputStream = socket.getInputStream();
-
-            // write header
-            sender.send(QpidByteBuffer.wrap(header));
-            sender.flush();
-
-            // reader header
-            byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
-            int len = inputStream.read(receivedHeader);
-            assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len);
-
-            // read the server start / sasl mechanisms
-            inputStream.read(new byte[1024]);
-
-            // Send heartbeat frames to simulate a client that, although active, fails to
-            // authenticate within the allowed period
-
-            long timeout = System.currentTimeMillis() + 3000;
-            boolean brokenPipe = false;
-            while (timeout > System.currentTimeMillis())
-            {
-                if (!writeHeartbeat(sender))
-                {
-                    brokenPipe = true;
-                    break;
-                }
-                Thread.sleep(100);
-            }
-            // If AMQP 1.0 we won't have sent anything (heartbeats not valid until after SASL)
-            // Windows also seems to allow writes to a socket which has actually been closed.
-            if(!brokenPipe && (SystemUtils.isWindows() || isBroker10()))
-            {
-                final int read = inputStream.read(new byte[10]);
-                brokenPipe = -1 == read;
-            }
-            assertTrue("Expected pipe to become broken within "
-                       + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + " timeout", brokenPipe);
-        }
-    }
-
-    private byte[] getHeaderBytesForBrokerVersion()
-    {
-        byte[] header = new byte[8];
-        header[0] = 'A';
-        header[1] = 'M';
-        header[2] = 'Q';
-        header[3] = 'P';
-        List<Byte> version = HEADERS.get(getBrokerProtocol()).iterator().next();
-        int i = 4;
-        for(byte b : version)
-        {
-            header[i++] = b;
-        }
-        return header;
-    }
-
-    public void testIllegalFrameSent_BrokerClosesConnection() throws Exception
-    {
-        try(Socket socket = new Socket())
-        {
-            socket.setSoTimeout(5000);
-
-
-            final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
-            LOGGER.debug("Making connection to {}", inetSocketAddress);
-
-            socket.connect(inetSocketAddress);
-
-            assertTrue("Expected socket to be connected", socket.isConnected());
-
-            final InputStream inputStream = socket.getInputStream();
-
-            // write header
-            TestSender sender = new TestSender(socket.getOutputStream());
-            sender.send(QpidByteBuffer.wrap(getHeaderBytesForBrokerVersion()));
-            sender.flush();
-
-            // reader header
-            byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
-            int len = inputStream.read(receivedHeader);
-            assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len);
-
-            sender.send(QpidByteBuffer.wrap("NOTANAMQPFRAME".getBytes()));
-
-        }
-    }
-
-    public void testProtocolNegotiationFromUnsupportedVersion() throws Exception
-    {
-        Protocol testProtocol = getBrokerProtocol();
-        String testSupportedProtocols = System.getProperty("test.amqp_port_protocols");
-        if (testSupportedProtocols!= null)
-        {
-            Set<Protocol> availableProtocols = new HashSet<>();
-            List<Object> protocols = new ObjectMapper().readValue(testSupportedProtocols, List.class);
-            for (Object protocol : protocols)
-            {
-                availableProtocols.add(Protocol.valueOf(String.valueOf(protocol)));
-            }
-            availableProtocols.remove(testProtocol);
-
-            for (Protocol protocol: availableProtocols)
-            {
-                String version = protocol.name().substring(5).replace('_', '-');
-                LOGGER.debug("Negotiation version {} represented as {}", protocol.name(), version);
-                setTestSystemProperty(ClientProperties.AMQP_VERSION, version);
-                AMQConnection connection = (AMQConnection)getConnection();
-                LOGGER.debug("Negotiated version {}", connection.getProtocolVersion());
-                assertEquals("Unexpected version negotiated: " + connection.getProtocolVersion(), convertProtocolToProtocolVersion(_expectedProtocolInit).toString(), connection.getProtocolVersion().toString());
-                connection.close();
-            }
-        }
-    }
-
-    private boolean writeHeartbeat(final TestSender sender)
-            throws IOException
-    {
-        if (isBroker010())
-        {
-            ConnectionHeartbeat heartbeat = new ConnectionHeartbeat();
-            ServerDisassembler serverDisassembler = new ServerDisassembler(sender, Frame.HEADER_SIZE + 1);
-            serverDisassembler.command(null, heartbeat);
-            serverDisassembler.closed();
-        }
-        else if(isBrokerPre010())
-        {
-            HeartbeatBody.FRAME.writePayload(sender);
-        }
-
-        return sender.hasSuccess();
-    }
-
-    private ProtocolVersion convertProtocolToProtocolVersion(final Protocol p)
-    {
-        final ProtocolVersion protocolVersion;
-        switch(p)
-        {
-            case AMQP_1_0:
-                protocolVersion = null;
-                break;
-            case AMQP_0_10:
-                protocolVersion = ProtocolVersion.v0_10;
-                break;
-            case AMQP_0_9_1:
-                protocolVersion = ProtocolVersion.v0_91;
-                break;
-            case AMQP_0_9:
-                protocolVersion = ProtocolVersion.v0_9;
-                break;
-            case AMQP_0_8:
-                protocolVersion = ProtocolVersion.v0_8;
-                break;
-            default:
-                throw new IllegalArgumentException("Unexpected " + p.name());
-        }
-        return protocolVersion;
-    }
-
-    private static class TestSender implements ByteBufferSender
-    {
-        private final OutputStream _output;
-        private boolean _success = true;
-
-
-        private TestSender(final OutputStream output)
-        {
-            _output = output;
-        }
-
-        @Override
-        public boolean isDirectBufferPreferred()
-        {
-            return false;
-        }
-
-        @Override
-        public void send(final QpidByteBuffer msg)
-        {
-            byte[] data = new byte[msg.remaining()];
-            msg.get(data);
-            try
-            {
-                _output.write(data);
-            }
-            catch (IOException e)
-            {
-                _success = false;
-            }
-
-        }
-
-        public boolean hasSuccess()
-        {
-            return _success;
-        }
-
-        @Override
-        public void flush()
-        {
-
-        }
-
-        @Override
-        public void close()
-        {
-
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 5333c7a..bc45318 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -35,6 +35,10 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -49,8 +53,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.SystemLauncher;
 import org.apache.qpid.server.SystemLauncherListener;
 import org.apache.qpid.server.logging.logback.LogbackLoggingSystemLauncherListener;
+import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Container;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
 import org.apache.qpid.server.model.ManageableMessage;
@@ -74,10 +78,11 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
     private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedBrokerPerClassAdminImpl.class);
     private final Map<String, Integer> _ports = new HashMap<>();
     private SystemLauncher _systemLauncher;
-    private Container<?> _broker;
+    private Broker<?> _broker;
     private VirtualHostNode<?> _currentVirtualHostNode;
     private String _currentWorkDirectory;
     private boolean _isPersistentStore;
+    private Map<String, String> _preservedContext;
 
     @Override
     public void beforeTestClass(final Class testClass)
@@ -90,12 +95,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
             _currentWorkDirectory = Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName())).toString();
 
             Map<String, String> context = new HashMap<>();
+            context.put("qpid.work_dir", _currentWorkDirectory);
+            context.put("qpid.port.protocol_handshake_timeout", "1000000");
             context.putAll(Arrays.stream((ConfigItem[]) testClass.getAnnotationsByType(ConfigItem.class))
                                  .collect(Collectors.toMap(ConfigItem::name,
                                                            ConfigItem::value,
                                                            (name, value) -> value)));
-            context.put("qpid.work_dir", _currentWorkDirectory);
-            context.put("qpid.port.protocol_handshake_timeout", "1000000");
 
             Map<String,Object> systemConfigAttributes = new HashMap<>();
             //systemConfigAttributes.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:config-protocol-tests.json");
@@ -458,7 +463,7 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
                 throw new IllegalStateException("System config is required");
             }
 
-            _broker = _systemConfig.getContainer();
+            _broker = (Broker<?>) _systemConfig.getContainer();
             Collection<Port> ports = _broker.getChildren(Port.class);
             for (Port port : ports)
             {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index f63c003..2d24142 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -81,10 +81,6 @@ org.apache.qpid.systest.rest.*
 org.apache.qpid.systest.rest.acl.*
 
 
-// CPP Broker does not timeout connections with no activity like the Qpid Broker-J
-org.apache.qpid.transport.ProtocolNegotiationTest#testNoProtocolHeaderSent_BrokerClosesConnection
-org.apache.qpid.transport.ProtocolNegotiationTest#testNoConnectionOpenSent_BrokerClosesConnection
-
 // QPID-6000 : Tests Qpid Broker-J specific message compression functionality, and uses the REST API to test it
 org.apache.qpid.systest.rest.MessageContentCompressionRestTest#*
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 5667963..3128524 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -33,9 +33,6 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnExclusiveQu
 // This test is checking features of the 0-x client specific implementation of Session
 org.apache.qpid.test.unit.client.AMQSessionTest#*
 
-// This test is concerned with the 0-x client establishing a connection to a supported version
-org.apache.qpid.transport.ProtocolNegotiationTest#testProtocolNegotiationFromUnsupportedVersion
-
 // Message compression not currently supported by the 1.0 client
 org.apache.qpid.systest.rest.MessageContentCompressionRestTest#*
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org