You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/04 19:06:55 UTC
[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-x/1.0]
Add heartbeating/idle tests to protocol suites
QPID-8038: [Broker-J] [AMQP 0-x/1.0] Add heartbeating/idle tests to protocol suites
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/99fa51f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/99fa51f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/99fa51f0
Branch: refs/heads/master
Commit: 99fa51f01cbd03e5712821bcdd782e59584c175f
Parents: a9e61c1
Author: Keith Wall <kw...@apache.org>
Authored: Thu Jan 4 10:13:29 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Jan 4 12:15:45 2018 +0000
----------------------------------------------------------------------
.../protocol/v0_10/ConnectionInteraction.java | 14 +++
.../tests/protocol/v0_10/ConnectionTest.java | 82 ++++++++++++++
.../protocol/v0_8/ConnectionInteraction.java | 16 +++
.../protocol/v0_8/ExchangeInteraction.java | 6 +
.../qpid/tests/protocol/v0_8/Interaction.java | 6 +
.../tests/protocol/v0_8/ConnectionTest.java | 111 +++++++++++++++++++
.../qpid/tests/protocol/v1_0/EmptyResponse.java | 31 ++++++
.../qpid/tests/protocol/v1_0/FrameDecoder.java | 4 +
.../qpid/tests/protocol/v1_0/Interaction.java | 20 ++++
.../v1_0/transport/connection/OpenTest.java | 55 +++++++++
10 files changed, 345 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
index d7b54b0..56d7498 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.tests.protocol.v0_10;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
@@ -33,6 +34,7 @@ public class ConnectionInteraction
private ConnectionStartOk _startOk;
private ConnectionTuneOk _tuneOk;
private ConnectionOpen _open;
+ private ConnectionHeartbeat _connectionHeartbeat;
public ConnectionInteraction(final Interaction interaction)
{
@@ -40,6 +42,7 @@ public class ConnectionInteraction
_startOk = new ConnectionStartOk();
_tuneOk = new ConnectionTuneOk();
_open = new ConnectionOpen();
+ _connectionHeartbeat = new ConnectionHeartbeat();
}
public Interaction startOk() throws Exception
@@ -69,6 +72,12 @@ public class ConnectionInteraction
return this;
}
+ public ConnectionInteraction tuneOkHeartbeat(final int heartbeat)
+ {
+ _tuneOk.setHeartbeat(heartbeat);
+ return this;
+ }
+
public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize)
{
_tuneOk.setMaxFrameSize(maxFrameSize);
@@ -80,4 +89,9 @@ public class ConnectionInteraction
_startOk.setResponse(response);
return this;
}
+
+ public Interaction heartbeat() throws Exception
+ {
+ return _interaction.sendPerformative(_connectionHeartbeat);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
index d273f02..1a800f6 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -20,8 +20,12 @@
*/
package org.apache.qpid.tests.protocol.v0_10;
+import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
@@ -30,6 +34,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
@@ -181,4 +186,81 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
}
}
+ @Test
+ @SpecificationTest(section = "9.connection",
+ description = "The heartbeat control may be used to generate artificial network traffic when a connection "
+ + "is idle.")
+ public void heartbeating() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+ assumeThat(response.hasHeartbeatMin(), is(true));
+ assumeThat(response.hasHeartbeatMax(), is(true));
+ assumeThat(response.getHeartbeatMin(), is(greaterThanOrEqualTo(0)));
+ assumeThat(response.getHeartbeatMax(), is(greaterThanOrEqualTo(1)));
+
+ final int heartbeatPeriod = 1;
+
+ interaction.connection()
+ .tuneOkChannelMax(response.getChannelMax())
+ .tuneOkMaxFrameSize(response.getMaxFrameSize())
+ .tuneOkHeartbeat(heartbeatPeriod)
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOk.class);
+
+ final long startTime = System.currentTimeMillis();
+ interaction.consumeResponse().getLatestResponse(ConnectionHeartbeat.class);
+ final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
+ assertThat("Heartbeat not received within expected time frame",
+ ((int)actualHeartbeatDelay / 1000),
+ is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(heartbeatPeriod * 2))));
+ interaction.connection().heartbeat();
+
+ interaction.consumeResponse(ConnectionHeartbeat.class)
+ .connection().heartbeat();
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "9.connection",
+ description = "If a connection is idle for more than twice the negotiated heartbeat delay, the peers MAY "
+ + "be considered disconnected.")
+ public void heartbeatingIncomingIdle() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+ .consumeResponse(ConnectionStart.class)
+ .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+ .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+ assumeThat(response.hasHeartbeatMin(), is(true));
+ assumeThat(response.hasHeartbeatMax(), is(true));
+ assumeThat(response.getHeartbeatMin(), is(greaterThanOrEqualTo(0)));
+ assumeThat(response.getHeartbeatMax(), is(greaterThanOrEqualTo(1)));
+
+ final int heartbeatPeriod = 1;
+
+ interaction.connection()
+ .tuneOkChannelMax(response.getChannelMax())
+ .tuneOkMaxFrameSize(response.getMaxFrameSize())
+ .tuneOkHeartbeat(heartbeatPeriod)
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOk.class);
+
+ interaction.consumeResponse().getLatestResponse(ConnectionHeartbeat.class);
+
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
index 236c49a..3c4943e 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -24,8 +24,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody;
@@ -43,6 +45,11 @@ public class ConnectionInteraction
private int _tuneOkHeartbeat;
private String _openVirtualHost;
+ private int _closeReplyCode = ErrorCodes.REPLY_SUCCESS;
+ private String _closeReplyText;
+ private int _closeClassId;
+ private int _closeMethodId;
+
public ConnectionInteraction(final Interaction interaction)
{
_interaction = interaction;
@@ -106,4 +113,13 @@ public class ConnectionInteraction
null,
false));
}
+
+ public Interaction close() throws Exception
+ {
+ return _interaction.sendPerformative(new ConnectionCloseBody(_interaction.getProtocolVersion(),
+ _closeReplyCode,
+ AMQShortString.valueOf(_closeReplyText),
+ _closeClassId,
+ _closeMethodId));
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
index fedd481..b35a1a9 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -84,6 +84,12 @@ public class ExchangeInteraction
return this;
}
+ public ExchangeInteraction declareNoWait(final boolean noWait)
+ {
+ _declareNoWait = noWait;
+ return this;
+ }
+
public ExchangeInteraction declareArguments(final Map<String,Object> args)
{
_declareArguments = args == null ? Collections.emptyMap() : new HashMap<>(args);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 5e89af8..bb38fa0 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
+import org.apache.qpid.server.protocol.ProtocolVersion;
import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
@@ -65,6 +66,11 @@ public class Interaction extends AbstractInteraction<Interaction>
return _protocolHeader;
}
+ public ProtocolVersion getProtocolVersion()
+ {
+ return ((FrameTransport) getTransport()).getProtocolVersion();
+ }
+
public Interaction sendPerformative(final AMQBody amqBody) throws Exception
{
return sendPerformative(_channelId, amqBody);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
index 9c693a3..81dbb85 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -20,9 +20,12 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
+import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import java.net.InetSocketAddress;
@@ -31,10 +34,12 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody;
import org.apache.qpid.tests.protocol.ChannelClosedResponse;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -246,4 +251,110 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
.consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
}
}
+
+ @Test
+ @SpecificationTest(section = "4.2.7",
+ description = "Heartbeat frames tell the recipient that the sender is still alive. The rate and timing of"
+ + " heartbeat frames is negotiated during connection tuning.")
+ public void heartbeating() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTuneBody response = interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+ final Long heartbeatPeriod = 1L;
+
+ interaction.connection()
+ .tuneOkChannelMax(response.getChannelMax())
+ .tuneOkFrameMax(response.getFrameMax())
+ .tuneOkHeartbeat(heartbeatPeriod.intValue())
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOkBody.class);
+
+ final long startTime = System.currentTimeMillis();
+ interaction.consumeResponse().getLatestResponse(HeartbeatBody.class);
+ final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
+ assertThat("Heartbeat not received within expected time frame",
+ actualHeartbeatDelay / 1000,
+ is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(heartbeatPeriod * 2))));
+ interaction.sendPerformative(new HeartbeatBody());
+
+ interaction.consumeResponse(HeartbeatBody.class)
+ .sendPerformative(new HeartbeatBody());
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.2.7", description = "Any sent octet is a valid substitute for a heartbeat")
+ public void heartbeatingIncomingTrafficIsNonHeartbeat() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTuneBody response = interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+ final Long heartbeatPeriod = 1L;
+
+ interaction.connection()
+ .tuneOkChannelMax(response.getChannelMax())
+ .tuneOkFrameMax(response.getFrameMax())
+ .tuneOkHeartbeat(heartbeatPeriod.intValue())
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOkBody.class)
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .consumeResponse(HeartbeatBody.class)
+ .exchange().declarePassive(true).declareNoWait(true).declare()
+ .consumeResponse(HeartbeatBody.class)
+ .sendPerformative(new HeartbeatBody())
+ .exchange().declarePassive(true).declareNoWait(true).declare();
+
+ interaction.connection()
+ .close()
+ .consumeResponse().getLatestResponse(ConnectionCloseOkBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.2.7",
+ description = " If a peer detects no incoming traffic (i.e. received octets) for two heartbeat intervals "
+ + "or longer, it should close the connection without following the Connection.Close/Close-Ok handshaking")
+ public void heartbeatingNoIncomingTraffic() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionTuneBody response = interaction.negotiateProtocol()
+ .consumeResponse(ConnectionStartBody.class)
+ .connection().startOkMechanism("ANONYMOUS")
+ .startOk()
+ .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+ final Long heartbeatPeriod = 1L;
+
+ interaction.connection()
+ .tuneOkChannelMax(response.getChannelMax())
+ .tuneOkFrameMax(response.getFrameMax())
+ .tuneOkHeartbeat(heartbeatPeriod.intValue())
+ .tuneOk()
+ .connection().open()
+ .consumeResponse(ConnectionOpenOkBody.class)
+ .consumeResponse(HeartbeatBody.class);
+
+ // Do not reflect a heartbeat so incoming line will be silent thus
+ // requiring the broker to close the connection.
+ transport.assertNoMoreResponsesAndChannelClosed();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java
new file mode 100644
index 0000000..c233e28
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.apache.qpid.tests.protocol.Response;
+
+public class EmptyResponse implements Response<EmptyResponse>
+{
+ @Override
+ public EmptyResponse getBody()
+ {
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 0c94ad7..a8ab32e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -241,6 +241,10 @@ public class FrameDecoder implements InputDecoder
resetInputHandlerAfterSaslOutcome();
}
}
+ else if (val == null)
+ {
+ response = new EmptyResponse();
+ }
else
{
throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 7a372a7..03f496a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -82,6 +82,10 @@ import org.apache.qpid.tests.protocol.Response;
public class Interaction extends AbstractInteraction<Interaction>
{
+ private static final FrameBody EMPTY_FRAME = (channel, conn) -> {
+ throw new UnsupportedOperationException();
+ };
+
private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Begin _begin;
private final End _end;
@@ -269,6 +273,12 @@ public class Interaction extends AbstractInteraction<Interaction>
return this;
}
+ public Interaction openIdleTimeOut(final int idleTimeOut)
+ {
+ _open.setIdleTimeOut(UnsignedInteger.valueOf(idleTimeOut));
+ return this;
+ }
+
public Interaction openProperties(final Map<Symbol, Object> properties)
{
_open.setProperties(properties);
@@ -1071,4 +1081,14 @@ public class Interaction extends AbstractInteraction<Interaction>
return new InteractionTransactionalState(handle);
}
+ ///////////
+ // Empty //
+ ///////////
+
+ public Interaction emptyFrame() throws Exception
+ {
+ sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO);
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index ab570da..a744cb1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -21,11 +21,13 @@
package org.apache.qpid.tests.protocol.v1_0.transport.connection;
import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.either;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import java.net.InetSocketAddress;
@@ -41,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.EmptyResponse;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
import org.apache.qpid.tests.protocol.SpecificationTest;
@@ -94,6 +97,58 @@ public class OpenTest extends BrokerAdminUsingTestBase
}
@Test
+ @SpecificationTest(section = "2.4.5",
+ description = "Implementations MUST be prepared to handle empty frames arriving on any valid channel")
+ public void emptyFrame() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .open().consumeResponse(Open.class)
+ .emptyFrame()
+ .doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.4.5",
+ description = "Connections are subject to an idle timeout threshold.")
+ public void idleTimeout() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ Interaction interaction = transport.newInteraction();
+ final int idleTimeOut = 1000;
+ Open responseOpen = interaction.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .openIdleTimeOut(idleTimeOut)
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+
+ final int peerIdleTimeOut = responseOpen.getIdleTimeOut().intValue();
+ assertThat(peerIdleTimeOut, is(either(equalTo(0)).or(greaterThanOrEqualTo(idleTimeOut))));
+
+ final long startTime = System.currentTimeMillis();
+ interaction.consumeResponse(EmptyResponse.class);
+ final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
+ assertThat("Empty frame not received within expected time frame",
+ ((int)actualHeartbeatDelay / 1000),
+ is(both(greaterThanOrEqualTo(peerIdleTimeOut)).and(lessThanOrEqualTo(peerIdleTimeOut * 2))));
+
+ if (peerIdleTimeOut > 0)
+ {
+ interaction.emptyFrame();
+ }
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
@SpecificationTest(section = "2.4.1",
description = "The open frame can only be sent on channel 0. ยง2.7.1: A peer that receives a channel number outside the supported range MUST close the connection with the framing-error error-code.")
public void failOpenOnChannelNotZero() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org