You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/02/12 16:11:48 UTC
[3/3] qpid-broker-j git commit: QPID-8038: [Broker-J] Add AMQP 1.0
idle and protocol header timeout protocol tests,
eliminating the old systems tests built using pieces of the old client
QPID-8038: [Broker-J] Add AMQP 1.0 idle and protocol header timeout protocol tests, eliminating the old systems tests built using pieces of the old client
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/91eb1cb2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/91eb1cb2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/91eb1cb2
Branch: refs/heads/master
Commit: 91eb1cb224e3cbff8646aa81e08114f01fa1e936
Parents: cc8b3c0
Author: Keith Wall <kw...@apache.org>
Authored: Mon Feb 12 13:54:58 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Feb 12 13:55:49 2018 +0000
----------------------------------------------------------------------
.../ProtocolHeaderTimeoutTest.java | 86 ++++
.../transport/connection/IdleTimeoutTest.java | 100 +++++
.../qpid/transport/ProtocolNegotiationTest.java | 401 -------------------
.../utils/EmbeddedBrokerPerClassAdminImpl.java | 15 +-
test-profiles/CPPExcludes | 4 -
test-profiles/Java10Excludes | 3 -
6 files changed, 196 insertions(+), 413 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
new file mode 100644
index 0000000..6402ad0
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/protocoltimeout/ProtocolHeaderTimeoutTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.protocoltimeout;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assume.assumeThat;
+
+import java.lang.reflect.Array;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import com.google.common.io.CharStreams;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.ProtocolVersion;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
+import org.apache.qpid.server.protocol.v0_8.transport.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
+import org.apache.qpid.server.transport.ByteBufferSender;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@ConfigItem(name = AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT, value = "500")
+public class ProtocolHeaderTimeoutTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ public void noProtocolHeader() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+
+ @Test
+ public void incompleteProtocolHeader() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+
+ final byte[] protocolHeader = transport.getProtocolHeader();
+ byte[] buf = new byte[1];
+ for(int i = 0 ; i < (protocolHeader.length - 1 ); i++)
+ {
+ System.arraycopy(protocolHeader, i, buf, 0, 1);
+ transport.sendBytes(buf);
+ transport.flush();
+ }
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java
new file mode 100644
index 0000000..bcfa4a5
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/IdleTimeoutTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.connection;
+
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Test;
+
+import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.EmptyResponse;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
+import org.apache.qpid.tests.utils.ConfigItem;
+
+@BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
+@ConfigItem(name = AmqpPort.HEART_BEAT_DELAY, value = IdleTimeoutTest.IDLE_SECONDS)
+public class IdleTimeoutTest extends BrokerAdminUsingTestBase
+{
+ static final String IDLE_SECONDS = "1";
+ private static final int IDLE_TIMEOUT_MS = Integer.parseInt(IDLE_SECONDS) * 1000;
+
+ @Test
+ @SpecificationTest(section = "2.4.5",
+ description = "If the [idle timeout threshold] threshold is exceeded, then a peer SHOULD try to"
+ + "gracefully close the connection using a close frame with an error explaining why.")
+ public void brokerClosesIdleConnection() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction();
+ Open responseOpen = interaction
+ .negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+ assertThat(responseOpen.getIdleTimeOut().intValue(), is(equalTo(IDLE_TIMEOUT_MS)));
+
+ // TODO: defect - broker ought to be sending a close performative but it just closes the socket.
+ interaction.consumeResponse().getLatestResponse(ChannelClosedResponse.class);
+ }
+ }
+ @Test
+ @SpecificationTest(section = "2.4.5",
+ description = "If a peer needs to satisfy the need to send traffic to prevent idle timeout, and has "
+ + "nothing to send, it MAY send an empty frame.")
+ public void idleLine() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction();
+ Open responseOpen = interaction
+ .negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openIdleTimeOut(IDLE_TIMEOUT_MS)
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+ assertThat(responseOpen.getIdleTimeOut().intValue(), is(equalTo(IDLE_TIMEOUT_MS)));
+
+ // Reflect the broker's empty frames
+ interaction.consumeResponse(EmptyResponse.class)
+ .emptyFrame();
+
+ interaction.consumeResponse(EmptyResponse.class)
+ .emptyFrame();
+
+ interaction.doCloseConnection();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java b/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
deleted file mode 100644
index f25bdd6..0000000
--- a/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
+++ /dev/null
@@ -1,401 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.transport;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.configuration.ClientProperties;
-import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody;
-import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation;
-import org.apache.qpid.server.protocol.ProtocolVersion;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Port;
-import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.server.model.port.AmqpPort;
-import org.apache.qpid.server.protocol.v0_10.ServerDisassembler;
-import org.apache.qpid.server.util.SystemUtils;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestBrokerConfiguration;
-import org.apache.qpid.server.protocol.v0_10.transport.Frame;
-import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
-import org.apache.qpid.server.transport.ByteBufferSender;
-
-public class ProtocolNegotiationTest extends QpidBrokerTestCase
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolNegotiationTest.class);
- private static final int SO_TIMEOUT = 5000;
- public static final int AMQP_HEADER_LEN = 8;
- private Protocol _expectedProtocolInit;
-
- private static final Map<Protocol, List<List<Byte>>> HEADERS = new HashMap<>();
-
- static
- {
- HEADERS.put(Protocol.AMQP_0_8, Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)8, (byte)0)));
- HEADERS.put(Protocol.AMQP_0_9, Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)0, (byte)9)));
- HEADERS.put(Protocol.AMQP_0_9_1, Collections.singletonList(Arrays.asList((byte)0, (byte)0, (byte)9, (byte)1)));
- HEADERS.put(Protocol.AMQP_0_10, Collections.singletonList(Arrays.asList((byte)1, (byte)1, (byte)0, (byte)10)));
- HEADERS.put(Protocol.AMQP_1_0, Arrays.asList(Arrays.asList((byte)3, (byte)1, (byte)0, (byte)0),
- Arrays.asList((byte)0, (byte)1, (byte)0, (byte)0)));
-
- }
-
- @Override
- public void setUp() throws Exception
- {
- // restrict broker to support only single protocol
- TestBrokerConfiguration config = getDefaultBrokerConfiguration();
- config.setObjectAttribute(Port.class,
- TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
- Port.PROTOCOLS,
- Arrays.asList(getBrokerProtocol()));
- Map<String,String> overriddenPortContext = new HashMap<>();
- overriddenPortContext.put(AmqpPort.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY, null);
- overriddenPortContext.put(AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT, String.valueOf(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT));
- config.setObjectAttribute(Port.class,
- TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT,
- Port.CONTEXT,
- overriddenPortContext);
- config.setBrokerAttribute(Broker.CONTEXT,
- Collections.singletonMap(AmqpPort.PROPERTY_DEFAULT_SUPPORTED_PROTOCOL_REPLY, null));
-
- super.setUp();
- _expectedProtocolInit = getBrokerProtocol();
- }
-
- public void testWrongProtocolHeaderSent_BrokerRespondsWithSupportedProtocol() throws Exception
- {
- try(Socket socket = new Socket())
- {
- socket.setSoTimeout(SO_TIMEOUT);
-
- final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
- LOGGER.debug("Making connection to {}", inetSocketAddress);
-
- socket.connect(inetSocketAddress);
-
- assertTrue("Expected socket to be connected", socket.isConnected());
-
- socket.getOutputStream().write("NOTANAMQPHEADER".getBytes());
- byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
- int len = socket.getInputStream().read(receivedHeader);
- assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len);
- assertEquals("Expected end-of-stream from socket signifying socket closed)",
- -1,
- socket.getInputStream().read());
-
- ProtocolInitiation protocolInitiation = new ProtocolInitiation(QpidByteBuffer.wrap(receivedHeader));
-
- assertTrue("Unexpected protocol initialisation", matchedExpectedVersion(receivedHeader));
- }
- }
-
- private boolean matchedExpectedVersion(byte[] header)
- {
- if(header[0] != 'A' || header[1] != 'M' || header[2] != 'Q' || header[3] != 'P')
- {
- return false;
- }
- List<Byte> version = new ArrayList<>();
- for(int i = 4; i<8; i++)
- {
- version.add(header[i]);
- }
- return HEADERS.get(_expectedProtocolInit).contains(version);
- }
-
- public void testNoProtocolHeaderSent_BrokerClosesConnection() throws Exception
- {
- try(Socket socket = new Socket())
- {
- socket.setSoTimeout(SO_TIMEOUT);
-
- final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
- LOGGER.debug("Making connection to {}", inetSocketAddress);
-
- socket.connect(inetSocketAddress);
-
- assertTrue("Expected socket to be connected", socket.isConnected());
-
- int c = 0;
- try
- {
- c = socket.getInputStream().read();
- LOGGER.debug("Read {}", c);
-
- }
- catch(SocketTimeoutException ste)
- {
- fail("Broker did not close connection with no activity within expected timeout");
- }
-
- assertEquals("Expected end-of-stream from socket signifying socket closed)", -1, c);
- }
- }
-
- public void testNoConnectionOpenSent_BrokerClosesConnection() throws Exception
- {
- setSystemProperty(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY, "1000");
-
- try(Socket socket = new Socket())
- {
- socket.setSoTimeout(5000);
-
- byte[] header = getHeaderBytesForBrokerVersion();
-
- final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
- LOGGER.debug("Making connection to {}", inetSocketAddress);
-
- socket.connect(inetSocketAddress);
-
- assertTrue("Expected socket to be connected", socket.isConnected());
-
- OutputStream outputStream = socket.getOutputStream();
- final TestSender sender = new TestSender(outputStream);
- final InputStream inputStream = socket.getInputStream();
-
- // write header
- sender.send(QpidByteBuffer.wrap(header));
- sender.flush();
-
- // reader header
- byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
- int len = inputStream.read(receivedHeader);
- assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len);
-
- // read the server start / sasl mechanisms
- inputStream.read(new byte[1024]);
-
- // Send heartbeat frames to simulate a client that, although active, fails to
- // authenticate within the allowed period
-
- long timeout = System.currentTimeMillis() + 3000;
- boolean brokenPipe = false;
- while (timeout > System.currentTimeMillis())
- {
- if (!writeHeartbeat(sender))
- {
- brokenPipe = true;
- break;
- }
- Thread.sleep(100);
- }
- // If AMQP 1.0 we won't have sent anything (heartbeats not valid until after SASL)
- // Windows also seems to allow writes to a socket which has actually been closed.
- if(!brokenPipe && (SystemUtils.isWindows() || isBroker10()))
- {
- final int read = inputStream.read(new byte[10]);
- brokenPipe = -1 == read;
- }
- assertTrue("Expected pipe to become broken within "
- + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + " timeout", brokenPipe);
- }
- }
-
- private byte[] getHeaderBytesForBrokerVersion()
- {
- byte[] header = new byte[8];
- header[0] = 'A';
- header[1] = 'M';
- header[2] = 'Q';
- header[3] = 'P';
- List<Byte> version = HEADERS.get(getBrokerProtocol()).iterator().next();
- int i = 4;
- for(byte b : version)
- {
- header[i++] = b;
- }
- return header;
- }
-
- public void testIllegalFrameSent_BrokerClosesConnection() throws Exception
- {
- try(Socket socket = new Socket())
- {
- socket.setSoTimeout(5000);
-
-
- final InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", getDefaultAmqpPort());
- LOGGER.debug("Making connection to {}", inetSocketAddress);
-
- socket.connect(inetSocketAddress);
-
- assertTrue("Expected socket to be connected", socket.isConnected());
-
- final InputStream inputStream = socket.getInputStream();
-
- // write header
- TestSender sender = new TestSender(socket.getOutputStream());
- sender.send(QpidByteBuffer.wrap(getHeaderBytesForBrokerVersion()));
- sender.flush();
-
- // reader header
- byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
- int len = inputStream.read(receivedHeader);
- assertEquals("Unexpected number of bytes available from socket", receivedHeader.length, len);
-
- sender.send(QpidByteBuffer.wrap("NOTANAMQPFRAME".getBytes()));
-
- }
- }
-
- public void testProtocolNegotiationFromUnsupportedVersion() throws Exception
- {
- Protocol testProtocol = getBrokerProtocol();
- String testSupportedProtocols = System.getProperty("test.amqp_port_protocols");
- if (testSupportedProtocols!= null)
- {
- Set<Protocol> availableProtocols = new HashSet<>();
- List<Object> protocols = new ObjectMapper().readValue(testSupportedProtocols, List.class);
- for (Object protocol : protocols)
- {
- availableProtocols.add(Protocol.valueOf(String.valueOf(protocol)));
- }
- availableProtocols.remove(testProtocol);
-
- for (Protocol protocol: availableProtocols)
- {
- String version = protocol.name().substring(5).replace('_', '-');
- LOGGER.debug("Negotiation version {} represented as {}", protocol.name(), version);
- setTestSystemProperty(ClientProperties.AMQP_VERSION, version);
- AMQConnection connection = (AMQConnection)getConnection();
- LOGGER.debug("Negotiated version {}", connection.getProtocolVersion());
- assertEquals("Unexpected version negotiated: " + connection.getProtocolVersion(), convertProtocolToProtocolVersion(_expectedProtocolInit).toString(), connection.getProtocolVersion().toString());
- connection.close();
- }
- }
- }
-
- private boolean writeHeartbeat(final TestSender sender)
- throws IOException
- {
- if (isBroker010())
- {
- ConnectionHeartbeat heartbeat = new ConnectionHeartbeat();
- ServerDisassembler serverDisassembler = new ServerDisassembler(sender, Frame.HEADER_SIZE + 1);
- serverDisassembler.command(null, heartbeat);
- serverDisassembler.closed();
- }
- else if(isBrokerPre010())
- {
- HeartbeatBody.FRAME.writePayload(sender);
- }
-
- return sender.hasSuccess();
- }
-
- private ProtocolVersion convertProtocolToProtocolVersion(final Protocol p)
- {
- final ProtocolVersion protocolVersion;
- switch(p)
- {
- case AMQP_1_0:
- protocolVersion = null;
- break;
- case AMQP_0_10:
- protocolVersion = ProtocolVersion.v0_10;
- break;
- case AMQP_0_9_1:
- protocolVersion = ProtocolVersion.v0_91;
- break;
- case AMQP_0_9:
- protocolVersion = ProtocolVersion.v0_9;
- break;
- case AMQP_0_8:
- protocolVersion = ProtocolVersion.v0_8;
- break;
- default:
- throw new IllegalArgumentException("Unexpected " + p.name());
- }
- return protocolVersion;
- }
-
- private static class TestSender implements ByteBufferSender
- {
- private final OutputStream _output;
- private boolean _success = true;
-
-
- private TestSender(final OutputStream output)
- {
- _output = output;
- }
-
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- @Override
- public void send(final QpidByteBuffer msg)
- {
- byte[] data = new byte[msg.remaining()];
- msg.get(data);
- try
- {
- _output.write(data);
- }
- catch (IOException e)
- {
- _success = false;
- }
-
- }
-
- public boolean hasSuccess()
- {
- return _success;
- }
-
- @Override
- public void flush()
- {
-
- }
-
- @Override
- public void close()
- {
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
index 5333c7a..bc45318 100644
--- a/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/systests-utils/src/main/java/org/apache/qpid/tests/utils/EmbeddedBrokerPerClassAdminImpl.java
@@ -35,6 +35,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -49,8 +53,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.SystemLauncher;
import org.apache.qpid.server.SystemLauncherListener;
import org.apache.qpid.server.logging.logback.LogbackLoggingSystemLauncherListener;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Container;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.ManageableMessage;
@@ -74,10 +78,11 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedBrokerPerClassAdminImpl.class);
private final Map<String, Integer> _ports = new HashMap<>();
private SystemLauncher _systemLauncher;
- private Container<?> _broker;
+ private Broker<?> _broker;
private VirtualHostNode<?> _currentVirtualHostNode;
private String _currentWorkDirectory;
private boolean _isPersistentStore;
+ private Map<String, String> _preservedContext;
@Override
public void beforeTestClass(final Class testClass)
@@ -90,12 +95,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
_currentWorkDirectory = Files.createTempDirectory(String.format("qpid-work-%s-%s-", timestamp, testClass.getSimpleName())).toString();
Map<String, String> context = new HashMap<>();
+ context.put("qpid.work_dir", _currentWorkDirectory);
+ context.put("qpid.port.protocol_handshake_timeout", "1000000");
context.putAll(Arrays.stream((ConfigItem[]) testClass.getAnnotationsByType(ConfigItem.class))
.collect(Collectors.toMap(ConfigItem::name,
ConfigItem::value,
(name, value) -> value)));
- context.put("qpid.work_dir", _currentWorkDirectory);
- context.put("qpid.port.protocol_handshake_timeout", "1000000");
Map<String,Object> systemConfigAttributes = new HashMap<>();
//systemConfigAttributes.put(SystemConfig.INITIAL_CONFIGURATION_LOCATION, "classpath:config-protocol-tests.json");
@@ -458,7 +463,7 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
throw new IllegalStateException("System config is required");
}
- _broker = _systemConfig.getContainer();
+ _broker = (Broker<?>) _systemConfig.getContainer();
Collection<Port> ports = _broker.getChildren(Port.class);
for (Port port : ports)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/test-profiles/CPPExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPExcludes b/test-profiles/CPPExcludes
index f63c003..2d24142 100755
--- a/test-profiles/CPPExcludes
+++ b/test-profiles/CPPExcludes
@@ -81,10 +81,6 @@ org.apache.qpid.systest.rest.*
org.apache.qpid.systest.rest.acl.*
-// CPP Broker does not timeout connections with no activity like the Qpid Broker-J
-org.apache.qpid.transport.ProtocolNegotiationTest#testNoProtocolHeaderSent_BrokerClosesConnection
-org.apache.qpid.transport.ProtocolNegotiationTest#testNoConnectionOpenSent_BrokerClosesConnection
-
// QPID-6000 : Tests Qpid Broker-J specific message compression functionality, and uses the REST API to test it
org.apache.qpid.systest.rest.MessageContentCompressionRestTest#*
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/91eb1cb2/test-profiles/Java10Excludes
----------------------------------------------------------------------
diff --git a/test-profiles/Java10Excludes b/test-profiles/Java10Excludes
index 5667963..3128524 100644
--- a/test-profiles/Java10Excludes
+++ b/test-profiles/Java10Excludes
@@ -33,9 +33,6 @@ org.apache.qpid.server.logging.ChannelLoggingTest#testChannelClosedOnExclusiveQu
// This test is checking features of the 0-x client specific implementation of Session
org.apache.qpid.test.unit.client.AMQSessionTest#*
-// This test is concerned with the 0-x client establishing a connection to a supported version
-org.apache.qpid.transport.ProtocolNegotiationTest#testProtocolNegotiationFromUnsupportedVersion
-
// Message compression not currently supported by the 1.0 client
org.apache.qpid.systest.rest.MessageContentCompressionRestTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org