You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/04 19:06:55 UTC

[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-x/1.0] Add heartbeating/idle tests to protocol suites

QPID-8038: [Broker-J] [AMQP 0-x/1.0]  Add heartbeating/idle tests to protocol suites


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/99fa51f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/99fa51f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/99fa51f0

Branch: refs/heads/master
Commit: 99fa51f01cbd03e5712821bcdd782e59584c175f
Parents: a9e61c1
Author: Keith Wall <kw...@apache.org>
Authored: Thu Jan 4 10:13:29 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Thu Jan 4 12:15:45 2018 +0000

----------------------------------------------------------------------
 .../protocol/v0_10/ConnectionInteraction.java   |  14 +++
 .../tests/protocol/v0_10/ConnectionTest.java    |  82 ++++++++++++++
 .../protocol/v0_8/ConnectionInteraction.java    |  16 +++
 .../protocol/v0_8/ExchangeInteraction.java      |   6 +
 .../qpid/tests/protocol/v0_8/Interaction.java   |   6 +
 .../tests/protocol/v0_8/ConnectionTest.java     | 111 +++++++++++++++++++
 .../qpid/tests/protocol/v1_0/EmptyResponse.java |  31 ++++++
 .../qpid/tests/protocol/v1_0/FrameDecoder.java  |   4 +
 .../qpid/tests/protocol/v1_0/Interaction.java   |  20 ++++
 .../v1_0/transport/connection/OpenTest.java     |  55 +++++++++
 10 files changed, 345 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
index d7b54b0..56d7498 100644
--- a/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-10/src/main/java/org/apache/qpid/tests/protocol/v0_10/ConnectionInteraction.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v0_10;
 
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpen;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStartOk;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionTuneOk;
@@ -33,6 +34,7 @@ public class ConnectionInteraction
     private ConnectionStartOk _startOk;
     private ConnectionTuneOk _tuneOk;
     private ConnectionOpen _open;
+    private ConnectionHeartbeat _connectionHeartbeat;
 
     public ConnectionInteraction(final Interaction interaction)
     {
@@ -40,6 +42,7 @@ public class ConnectionInteraction
         _startOk = new ConnectionStartOk();
         _tuneOk = new ConnectionTuneOk();
         _open = new ConnectionOpen();
+        _connectionHeartbeat = new ConnectionHeartbeat();
     }
 
     public Interaction startOk() throws Exception
@@ -69,6 +72,12 @@ public class ConnectionInteraction
         return this;
     }
 
+    public ConnectionInteraction tuneOkHeartbeat(final int heartbeat)
+    {
+        _tuneOk.setHeartbeat(heartbeat);
+        return this;
+    }
+
     public ConnectionInteraction tuneOkMaxFrameSize(final int maxFrameSize)
     {
         _tuneOk.setMaxFrameSize(maxFrameSize);
@@ -80,4 +89,9 @@ public class ConnectionInteraction
         _startOk.setResponse(response);
         return this;
     }
+
+    public Interaction heartbeat() throws Exception
+    {
+        return _interaction.sendPerformative(_connectionHeartbeat);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
index d273f02..1a800f6 100644
--- a/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-10/src/test/java/org/apache/qpid/tests/protocol/v0_10/ConnectionTest.java
@@ -20,8 +20,12 @@
  */
 package org.apache.qpid.tests.protocol.v0_10;
 
+import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -30,6 +34,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionClose;
+import org.apache.qpid.server.protocol.v0_10.transport.ConnectionHeartbeat;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionOpenOk;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionSecure;
 import org.apache.qpid.server.protocol.v0_10.transport.ConnectionStart;
@@ -181,4 +186,81 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
         }
     }
 
+    @Test
+    @SpecificationTest(section = "9.connection",
+            description = "The heartbeat control may be used to generate artificial network traffic when a connection "
+                          + "is idle.")
+    public void heartbeating() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+                                                 .consumeResponse(ConnectionStart.class)
+                                                 .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                                                 .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+            assumeThat(response.hasHeartbeatMin(), is(true));
+            assumeThat(response.hasHeartbeatMax(), is(true));
+            assumeThat(response.getHeartbeatMin(), is(greaterThanOrEqualTo(0)));
+            assumeThat(response.getHeartbeatMax(), is(greaterThanOrEqualTo(1)));
+
+            final int heartbeatPeriod = 1;
+
+            interaction.connection()
+                       .tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkMaxFrameSize(response.getMaxFrameSize())
+                       .tuneOkHeartbeat(heartbeatPeriod)
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionOpenOk.class);
+
+            final long startTime = System.currentTimeMillis();
+            interaction.consumeResponse().getLatestResponse(ConnectionHeartbeat.class);
+            final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
+            assertThat("Heartbeat not received within expected time frame",
+                       ((int)actualHeartbeatDelay / 1000),
+                       is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(heartbeatPeriod * 2))));
+            interaction.connection().heartbeat();
+
+            interaction.consumeResponse(ConnectionHeartbeat.class)
+                       .connection().heartbeat();
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "9.connection",
+            description = "If a connection is idle for more than twice the negotiated heartbeat delay, the peers MAY "
+                          + "be considered disconnected.")
+    public void heartbeatingIncomingIdle() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTune response = interaction.negotiateProtocol().consumeResponse()
+                                                 .consumeResponse(ConnectionStart.class)
+                                                 .connection().startOkMechanism(ConnectionInteraction.SASL_MECHANISM_ANONYMOUS).startOk()
+                                                 .consumeResponse().getLatestResponse(ConnectionTune.class);
+
+            assumeThat(response.hasHeartbeatMin(), is(true));
+            assumeThat(response.hasHeartbeatMax(), is(true));
+            assumeThat(response.getHeartbeatMin(), is(greaterThanOrEqualTo(0)));
+            assumeThat(response.getHeartbeatMax(), is(greaterThanOrEqualTo(1)));
+
+            final int heartbeatPeriod = 1;
+
+            interaction.connection()
+                       .tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkMaxFrameSize(response.getMaxFrameSize())
+                       .tuneOkHeartbeat(heartbeatPeriod)
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionOpenOk.class);
+
+            interaction.consumeResponse().getLatestResponse(ConnectionHeartbeat.class);
+
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
index 236c49a..3c4943e 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ConnectionInteraction.java
@@ -24,8 +24,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneOkBody;
@@ -43,6 +45,11 @@ public class ConnectionInteraction
     private int _tuneOkHeartbeat;
     private String _openVirtualHost;
 
+    private int _closeReplyCode = ErrorCodes.REPLY_SUCCESS;
+    private String _closeReplyText;
+    private int _closeClassId;
+    private int _closeMethodId;
+
     public ConnectionInteraction(final Interaction interaction)
     {
         _interaction = interaction;
@@ -106,4 +113,13 @@ public class ConnectionInteraction
                                                                     null,
                                                                     false));
     }
+
+    public Interaction close() throws Exception
+    {
+        return _interaction.sendPerformative(new ConnectionCloseBody(_interaction.getProtocolVersion(),
+                                                                     _closeReplyCode,
+                                                                     AMQShortString.valueOf(_closeReplyText),
+                                                                     _closeClassId,
+                                                                     _closeMethodId));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
index fedd481..b35a1a9 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -84,6 +84,12 @@ public class ExchangeInteraction
         return this;
     }
 
+    public ExchangeInteraction declareNoWait(final boolean noWait)
+    {
+        _declareNoWait = noWait;
+        return this;
+    }
+
     public ExchangeInteraction declareArguments(final Map<String,Object> args)
     {
         _declareArguments = args == null ? Collections.emptyMap() : new HashMap<>(args);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
index 5e89af8..bb38fa0 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/Interaction.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import org.apache.qpid.server.protocol.ProtocolVersion;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQBody;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
@@ -65,6 +66,11 @@ public class Interaction extends AbstractInteraction<Interaction>
         return _protocolHeader;
     }
 
+    public ProtocolVersion getProtocolVersion()
+    {
+        return ((FrameTransport) getTransport()).getProtocolVersion();
+    }
+
     public Interaction sendPerformative(final AMQBody amqBody) throws Exception
     {
         return sendPerformative(_channelId, amqBody);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
index 9c693a3..81dbb85 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ConnectionTest.java
@@ -20,9 +20,12 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 import java.net.InetSocketAddress;
 
@@ -31,10 +34,12 @@ import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionSecureBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionStartBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody;
+import org.apache.qpid.server.protocol.v0_8.transport.HeartbeatBody;
 import org.apache.qpid.tests.protocol.ChannelClosedResponse;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -246,4 +251,110 @@ public class ConnectionTest extends BrokerAdminUsingTestBase
                        .consumeResponse(ConnectionCloseBody.class, ChannelClosedResponse.class);
         }
     }
+
+    @Test
+    @SpecificationTest(section = "4.2.7",
+            description = "Heartbeat frames tell the recipient that the sender is still alive. The rate and timing of"
+                          + " heartbeat frames is negotiated during connection tuning.")
+    public void heartbeating() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     .consumeResponse(ConnectionStartBody.class)
+                                                     .connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            final Long heartbeatPeriod = 1L;
+
+            interaction.connection()
+                       .tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(response.getFrameMax())
+                       .tuneOkHeartbeat(heartbeatPeriod.intValue())
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionOpenOkBody.class);
+
+            final long startTime = System.currentTimeMillis();
+            interaction.consumeResponse().getLatestResponse(HeartbeatBody.class);
+            final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
+            assertThat("Heartbeat not received within expected time frame",
+                       actualHeartbeatDelay / 1000,
+                       is(both(greaterThanOrEqualTo(heartbeatPeriod)).and(lessThanOrEqualTo(heartbeatPeriod * 2))));
+            interaction.sendPerformative(new HeartbeatBody());
+
+            interaction.consumeResponse(HeartbeatBody.class)
+                       .sendPerformative(new HeartbeatBody());
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.2.7", description = "Any sent octet is a valid substitute for a heartbeat")
+    public void heartbeatingIncomingTrafficIsNonHeartbeat() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     .consumeResponse(ConnectionStartBody.class)
+                                                     .connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            final Long heartbeatPeriod = 1L;
+
+            interaction.connection()
+                       .tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(response.getFrameMax())
+                       .tuneOkHeartbeat(heartbeatPeriod.intValue())
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionOpenOkBody.class)
+                       .channel().open()
+                       .consumeResponse(ChannelOpenOkBody.class)
+                       .consumeResponse(HeartbeatBody.class)
+                       .exchange().declarePassive(true).declareNoWait(true).declare()
+                       .consumeResponse(HeartbeatBody.class)
+                       .sendPerformative(new HeartbeatBody())
+                       .exchange().declarePassive(true).declareNoWait(true).declare();
+
+            interaction.connection()
+                       .close()
+                       .consumeResponse().getLatestResponse(ConnectionCloseOkBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.2.7",
+            description = " If a peer detects no incoming traffic (i.e. received octets) for two heartbeat intervals "
+                          + "or longer, it should close the connection without following the Connection.Close/Close-Ok handshaking")
+    public void heartbeatingNoIncomingTraffic() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionTuneBody response = interaction.negotiateProtocol()
+                                                     .consumeResponse(ConnectionStartBody.class)
+                                                     .connection().startOkMechanism("ANONYMOUS")
+                                                     .startOk()
+                                                     .consumeResponse().getLatestResponse(ConnectionTuneBody.class);
+
+            final Long heartbeatPeriod = 1L;
+
+            interaction.connection()
+                       .tuneOkChannelMax(response.getChannelMax())
+                       .tuneOkFrameMax(response.getFrameMax())
+                       .tuneOkHeartbeat(heartbeatPeriod.intValue())
+                       .tuneOk()
+                       .connection().open()
+                       .consumeResponse(ConnectionOpenOkBody.class)
+                       .consumeResponse(HeartbeatBody.class);
+
+            // Do not reflect a heartbeat so incoming line will be silent thus
+            // requiring the broker to close the connection.
+            transport.assertNoMoreResponsesAndChannelClosed();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java
new file mode 100644
index 0000000..c233e28
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmptyResponse.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.apache.qpid.tests.protocol.Response;
+
+public class EmptyResponse implements Response<EmptyResponse>
+{
+    @Override
+    public EmptyResponse getBody()
+    {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 0c94ad7..a8ab32e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -241,6 +241,10 @@ public class FrameDecoder implements InputDecoder
                         resetInputHandlerAfterSaslOutcome();
                     }
                 }
+                else if (val == null)
+                {
+                    response = new EmptyResponse();
+                }
                 else
                 {
                     throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 7a372a7..03f496a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -82,6 +82,10 @@ import org.apache.qpid.tests.protocol.Response;
 
 public class Interaction extends AbstractInteraction<Interaction>
 {
+    private static final FrameBody EMPTY_FRAME = (channel, conn) -> {
+        throw new UnsupportedOperationException();
+    };
+
     private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>());
     private final Begin _begin;
     private final End _end;
@@ -269,6 +273,12 @@ public class Interaction extends AbstractInteraction<Interaction>
         return this;
     }
 
+    public Interaction openIdleTimeOut(final int idleTimeOut)
+    {
+        _open.setIdleTimeOut(UnsignedInteger.valueOf(idleTimeOut));
+        return this;
+    }
+
     public Interaction openProperties(final Map<Symbol, Object> properties)
     {
         _open.setProperties(properties);
@@ -1071,4 +1081,14 @@ public class Interaction extends AbstractInteraction<Interaction>
         return new InteractionTransactionalState(handle);
     }
 
+    ///////////
+    // Empty //
+    ///////////
+
+    public Interaction emptyFrame() throws Exception
+    {
+        sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO);
+        return this;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/99fa51f0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index ab570da..a744cb1 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -21,11 +21,13 @@
 package org.apache.qpid.tests.protocol.v1_0.transport.connection;
 
 import static org.hamcrest.CoreMatchers.both;
+import static org.hamcrest.CoreMatchers.either;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 
 import java.net.InetSocketAddress;
@@ -41,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.EmptyResponse;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 
@@ -94,6 +97,58 @@ public class OpenTest extends BrokerAdminUsingTestBase
     }
 
     @Test
+    @SpecificationTest(section = "2.4.5",
+            description = "Implementations MUST be prepared to handle empty frames arriving on any valid channel")
+    public void emptyFrame() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .openContainerId("testContainerId")
+                       .open().consumeResponse(Open.class)
+                       .emptyFrame()
+                       .doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.4.5",
+            description = "Connections are subject to an idle timeout threshold.")
+    public void idleTimeout() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            Interaction interaction = transport.newInteraction();
+            final int idleTimeOut = 1000;
+            Open responseOpen = interaction.negotiateProtocol().consumeResponse()
+                                           .openContainerId("testContainerId")
+                                           .openIdleTimeOut(idleTimeOut)
+                                           .open().consumeResponse()
+                                           .getLatestResponse(Open.class);
+
+            final int peerIdleTimeOut = responseOpen.getIdleTimeOut().intValue();
+            assertThat(peerIdleTimeOut, is(either(equalTo(0)).or(greaterThanOrEqualTo(idleTimeOut))));
+
+            final long startTime = System.currentTimeMillis();
+            interaction.consumeResponse(EmptyResponse.class);
+            final long actualHeartbeatDelay = System.currentTimeMillis() - startTime;
+            assertThat("Empty frame not received within expected time frame",
+                       ((int)actualHeartbeatDelay / 1000),
+                       is(both(greaterThanOrEqualTo(peerIdleTimeOut)).and(lessThanOrEqualTo(peerIdleTimeOut * 2))));
+
+            if (peerIdleTimeOut > 0)
+            {
+                interaction.emptyFrame();
+            }
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
     @SpecificationTest(section = "2.4.1",
             description = "The open frame can only be sent on channel 0. ยง2.7.1: A peer that receives a channel number outside the supported range MUST close the connection with the framing-error error-code.")
     public void failOpenOnChannelNotZero() throws Exception


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org