You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/03/30 22:48:54 UTC

[qpid-protonj2] branch main updated: PROTON-2702 Allow the server to accept a new connection

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new f5cdc0d3 PROTON-2702 Allow the server to accept a new connection
f5cdc0d3 is described below

commit f5cdc0d3329d47bd38d5436bedcc73f0d1f8dfaf
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Thu Mar 30 18:48:42 2023 -0400

    PROTON-2702 Allow the server to accept a new connection
    
    If the active connection drops or is disconnected allow the server to be
    scripted to accept a new client connection.
---
 .../qpid/protonj2/client/impl/ReconnectTest.java   |   4 +-
 .../qpid/protonj2/test/driver/AMQPTestDriver.java  |  15 ++-
 .../protonj2/test/driver/ProtonTestClient.java     |  19 ++-
 .../protonj2/test/driver/ProtonTestConnector.java  |  12 +-
 .../qpid/protonj2/test/driver/ProtonTestPeer.java  |  96 +++++++++++++--
 .../protonj2/test/driver/ProtonTestServer.java     |  58 ++++++++-
 .../test/driver/actions/ConnectionDropAction.java  |   4 +-
 ...tionDropAction.java => PeerShutdownAction.java} |  12 +-
 .../protonj2/test/driver/netty/NettyIOBuilder.java |   8 +-
 .../protonj2/test/driver/netty/NettyServer.java    | 130 ++++++++++++++++++---
 .../test/driver/netty/netty4/Netty4Server.java     |  37 +++++-
 .../test/driver/netty/netty5/Netty5Server.java     |  31 ++++-
 .../protonj2/test/driver/ProtonTestClientTest.java |  31 +++++
 13 files changed, 406 insertions(+), 51 deletions(-)

diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java
index 84adf200..28cf7789 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java
@@ -57,7 +57,7 @@ public class ReconnectTest extends ImperativeClientTestCase {
 
             firstPeer.expectSASLAnonymousConnect();
             firstPeer.expectOpen().withContainerId(any(String.class)).respond();
-            firstPeer.dropAfterLastHandler(5);
+            firstPeer.shutdownAfterLastHandler(5);
             firstPeer.start();
 
             finalPeer.expectSASLAnonymousConnect();
@@ -101,7 +101,7 @@ public class ReconnectTest extends ImperativeClientTestCase {
             finalPeer.waitForScriptToComplete();
             finalPeer.expectBegin().respond();
             finalPeer.expectEnd().respond();
-            finalPeer.dropAfterLastHandler(10);
+            finalPeer.shutdownAfterLastHandler(10);
 
             Session session = connection.openSession().openFuture().get();
 
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index a277de7a..9403e01d 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -253,6 +253,15 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         }
     }
 
+    void handleConnectedDropped() throws AssertionError {
+        synchronized (script) {
+            // For now we just reset the parse as any new connection would need to
+            // send an AMQP header, other validation could be added if we expand
+            // processing on client disconnect events.
+            frameParser.resetToExpectingHeader();
+        }
+    }
+
     void handleHeader(AMQPHeader header) throws AssertionError {
         synchronized (script) {
             final ScriptedElement scriptEntry = script.poll();
@@ -590,7 +599,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         final ByteBuffer output;
         final ByteBuffer buffer = frameEncoder.handleWrite(performative, channel, payload, null);
 
-        if (deferredWrites != null) {
+        if (!deferredWrites.isEmpty()) {
             deferredWrites.add(buffer);
             try {
                 output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
@@ -644,7 +653,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         try {
             final ByteBuffer buffer = frameEncoder.handleWrite(performative, channel);
 
-            if (deferredWrites != null) {
+            if (!deferredWrites.isEmpty()) {
                 deferredWrites.add(buffer);
                 try {
                     output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
@@ -674,7 +683,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
         try {
             final ByteBuffer output;
 
-            if (deferredWrites != null) {
+            if (!deferredWrites.isEmpty()) {
                 LOG.trace("{} appending deferred buffer {} to next write.", driverName, deferredWrites);
                 deferredWrites.add(ByteBuffer.wrap(header.getBuffer()).asReadOnlyBuffer());
 
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index b9eade1d..97c59e28 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -23,9 +23,9 @@ import java.util.function.Supplier;
 
 import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
 import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
-import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
 import org.apache.qpid.protonj2.test.driver.netty.NettyClient;
 import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
+import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,8 +63,8 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
                                                    this::processDriverAssertion,
                                                    this::eventLoop);
         this.client = NettyIOBuilder.createClient(options,
-                                             this::processConnectionEstablished,
-                                             this::processChannelInput);
+                                                  this::processConnectionEstablished,
+                                                  this::processChannelInput);
     }
 
     public void connect(String hostname, int port) throws IOException {
@@ -83,7 +83,13 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
     }
 
     @Override
-    protected void processCloseRequest() {
+    protected void processConnectionDropped() {
+        LOG.trace("AMQP Client connection to remote dropped.");
+        driver.handleConnectedDropped();
+    }
+
+    @Override
+    protected void processCloseConnectionRequest() {
         try {
             client.close();
         } catch (Throwable e) {
@@ -91,6 +97,11 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
         }
     }
 
+    @Override
+    protected void processPeerShutdownRequest() {
+        processCloseConnectionRequest(); // Same outcome as the client is single use only.
+    }
+
     @Override
     protected void processDriverOutput(ByteBuffer frame) {
         LOG.trace("AMQP Client Channel writing: {}", frame);
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
index 5375fe3f..fc577554 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
@@ -64,7 +64,12 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte
     //----- Internal implementation which can be overridden
 
     @Override
-    protected void processCloseRequest() {
+    protected void processCloseConnectionRequest() {
+        // nothing to do in this peer implementation.
+    }
+
+    @Override
+    protected void processPeerShutdownRequest() {
         // nothing to do in this peer implementation.
     }
 
@@ -77,4 +82,9 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte
     protected void processConnectionEstablished() {
         driver.handleConnectedEstablished();
     }
+
+    @Override
+    protected void processConnectionDropped() {
+        driver.handleConnectedDropped();
+    }
 }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
index 6dc4d1af..78d06448 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.protonj2.test.driver.actions.ConnectionDropAction;
+import org.apache.qpid.protonj2.test.driver.actions.PeerShutdownAction;
 
 /**
  * Abstract base class that is implemented by all the AMQP v1.0 test peer
@@ -38,43 +39,91 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
     @Override
     public void close() {
         if (closed.compareAndSet(false, true)) {
-            processCloseRequest();
+            processPeerShutdownRequest();
         }
     }
 
+    /**
+     * Disconnect any current active connection to this peer, or from this peer if it
+     * is a client type connection. Depending on the peer implementation this could be
+     * a terminal action. A server implementation should allow for a new connection to
+     * be made to it following dropping any existing client connection.
+     */
+    public void dropConnection() {
+        if (closed.compareAndSet(false, true)) {
+            processCloseConnectionRequest();
+        }
+    }
+
+    /**
+     * Waits for the test script to complete and ignores any errors seen. This method is
+     * generally only useful for scripts that complete other specific waits and then want
+     * to wait for a shutdown sequence without scripting an exact order of events.
+     */
     public void waitForScriptToCompleteIgnoreErrors() {
         getDriver().waitForScriptToCompleteIgnoreErrors();
     }
 
+    /**
+     * Waits for the configured test script to complete and will throw any errors
+     * from the expected script as {@link AssertionError} exceptions.
+     */
     public void waitForScriptToComplete() {
         getDriver().waitForScriptToComplete();
     }
 
+    /**
+     * Waits for the configured test script to complete and will throw any errors
+     * from the expected script as {@link AssertionError} exceptions.
+     *
+     * @param timeout
+     * 		The time to wait in milliseconds before failing the wait.
+     */
     public void waitForScriptToComplete(long timeout) {
         getDriver().waitForScriptToComplete(timeout);
     }
 
+    /**
+     * Waits for the configured test script to complete and will throw any errors
+     * from the expected script as {@link AssertionError} exceptions.
+     *
+     * @param timeout
+     * 		The time to wait before failing the wait.
+     * @param units
+     * 		The units to use for the given time interval.
+     */
     public void waitForScriptToComplete(long timeout, TimeUnit units) {
         getDriver().waitForScriptToComplete(timeout, units);
     }
 
+    /**
+     * @return the total number of AMQP idle frames that were received.
+     */
     public int getEmptyFrameCount() {
         return getDriver().getEmptyFrameCount();
     }
 
+    /**
+     * @return the total number of AMQP performatives that were read so far.
+     */
     public int getPerformativeCount() {
         return getDriver().getPerformativeCount();
     }
 
+    /**
+     * @return the total number of SASL performatives that were read so far.
+     */
     public int getSaslPerformativeCount() {
         return getDriver().getSaslPerformativeCount();
     }
 
     /**
      * Drops the connection to the connected client immediately after the last handler that was
-     * registered before this scripted action is queued.  Adding any additional test scripting to
-     * the test driver will either not be acted on or could cause the wait methods to not return
-     * as they will never be invoked.
+     * registered before this scripted action is queued. Depending on the test peer this action
+     * could be a terminal one meaning no other scripted elements could be added and doing so might
+     * result in wait for completion calls to hang. In general any server type peer should allow
+     * reconnects after dropping a client connection but a client peer would be expected not to
+     * allow and action following a drop.
      *
      * @return this test peer instance.
      */
@@ -85,9 +134,11 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
 
     /**
      * Drops the connection to the connected client immediately after the last handler that was
-     * registered before this scripted action is queued.  Adding any additional test scripting to
-     * the test driver will either not be acted on or could cause the wait methods to not return
-     * as they will never be invoked.
+     * registered before this scripted action is queued. Depending on the test peer this action
+     * could be a terminal one meaning no other scripted elements could be added and doing so might
+     * result in wait for completion calls to hang. In general any server type peer should allow
+     * reconnects after dropping a client connection but a client peer would be expected not to
+     * allow and action following a drop.
      *
      * @param delay
      *      The time in milliseconds to wait before running the action after the last handler is run.
@@ -99,14 +150,43 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
         return this;
     }
 
+    /**
+     * Shuts down the peer dropping and connections and rendering the peer not usable for
+     * any new actions to be added.
+     *
+     * @return this test peer instance.
+     */
+    public ProtonTestPeer shutdownAfterLastHandler() {
+        getDriver().addScriptedElement(new PeerShutdownAction(this));
+        return this;
+    }
+
+    /**
+     * Shuts down the peer dropping and connections and rendering the peer not usable for
+     * any new actions to be added.
+     *
+     * @param delay
+     *      The time in milliseconds to wait before running the action after the last handler is run.
+     *
+     * @return this test peer instance.
+     */
+    public ProtonTestPeer shutdownAfterLastHandler(int delay) {
+        getDriver().addScriptedElement(new PeerShutdownAction(this).afterDelay(delay));
+        return this;
+    }
+
     protected abstract String getPeerName();
 
-    protected abstract void processCloseRequest();
+    protected abstract void processCloseConnectionRequest();
+
+    protected abstract void processPeerShutdownRequest();
 
     protected abstract void processDriverOutput(ByteBuffer frame);
 
     protected abstract void processConnectionEstablished();
 
+    protected abstract void processConnectionDropped();
+
     protected void checkClosed() {
         if (closed.get()) {
             throw new IllegalStateException("The test peer is closed");
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index ad2c2884..b4ab29ce 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -23,10 +23,11 @@ import java.util.function.Supplier;
 
 import javax.net.ssl.SSLEngine;
 
+import org.apache.qpid.protonj2.test.driver.actions.ConnectionDropAction;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
 import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
-import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
 import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
+import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
 import org.apache.qpid.protonj2.test.driver.netty.NettyServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,10 +62,13 @@ public class ProtonTestServer extends ProtonTestPeer {
      *      The options that control the behavior of the deployed server.
      */
     public ProtonTestServer(ProtonTestServerOptions options) {
-        this.driver = new NettyAwareAMQPTestDriver(this::processDriverOutput, this::processDriverAssertion, this::eventLoop);
+        this.driver = new NettyAwareAMQPTestDriver(this::processDriverOutput,
+                                                   this::processDriverAssertion,
+                                                   this::eventLoop);
         this.server = NettyIOBuilder.createServer(options,
-                                             this::processConnectionEstablished,
-                                             this::processChannelInput);
+                                                  this::processConnectionEstablished,
+                                                  this::processConnectionDropped,
+                                                  this::processChannelInput);
     }
 
     /**
@@ -119,6 +123,35 @@ public class ProtonTestServer extends ProtonTestPeer {
         }
     }
 
+    /**
+     * Drops the connection to the connected client immediately after the last handler that was
+     * registered before this scripted action is queued.  Adding any additional test scripting to
+     * the test driver will either not be acted on or could cause the wait methods to not return
+     * as they will never be invoked.
+     *
+     * @return this test peer instance.
+     */
+    public ProtonTestPeer dropAfterLastHandler() {
+        getDriver().addScriptedElement(new ConnectionDropAction(this));
+        return this;
+    }
+
+    /**
+     * Drops the connection to the connected client immediately after the last handler that was
+     * registered before this scripted action is queued.  Adding any additional test scripting to
+     * the test driver will either not be acted on or could cause the wait methods to not return
+     * as they will never be invoked.
+     *
+     * @param delay
+     *      The time in milliseconds to wait before running the action after the last handler is run.
+     *
+     * @return this test peer instance.
+     */
+    public ProtonTestPeer dropAfterLastHandler(int delay) {
+        getDriver().addScriptedElement(new ConnectionDropAction(this).afterDelay(delay));
+        return this;
+    }
+
     public boolean isAcceptingConnections() {
         return server.isAcceptingConnections();
     }
@@ -250,7 +283,7 @@ public class ProtonTestServer extends ProtonTestPeer {
     //----- Internal implementation which can be overridden
 
     @Override
-    protected void processCloseRequest() {
+    protected void processPeerShutdownRequest() {
         try {
             server.stopAsync();
         } catch (Throwable e) {
@@ -258,6 +291,15 @@ public class ProtonTestServer extends ProtonTestPeer {
         }
     }
 
+    @Override
+    protected void processCloseConnectionRequest() {
+        try {
+            server.disconnectClient();
+        } catch (Throwable e) {
+            LOG.info("Error suppressed on server dropping connected client: ", e);
+        }
+    }
+
     @Override
     protected void processDriverOutput(ByteBuffer frame) {
         LOG.trace("AMQP Server Channel writing: {}", frame);
@@ -270,6 +312,12 @@ public class ProtonTestServer extends ProtonTestPeer {
         driver.handleConnectedEstablished();
     }
 
+    @Override
+    protected void processConnectionDropped() {
+        LOG.trace("AMQP Server reports client connection dropped.");
+        driver.handleConnectedDropped();
+    }
+
     protected void processDriverAssertion(AssertionError error) {
         LOG.trace("AMQP Server Closing due to error: {}", error.getMessage());
         close();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
index 45908f5b..7d08da65 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
@@ -38,8 +38,8 @@ public class ConnectionDropAction implements ScriptedAction {
 
     @Override
     public ScriptedAction now() {
-        LOG.info("Connection Drop Action closing test peer as scripted");
-        peer.close();
+        LOG.info("Connection Drop Action closing test peer connection as scripted");
+        peer.dropConnection();
         return this;
     }
 
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/PeerShutdownAction.java
similarity index 86%
copy from protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
copy to protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/PeerShutdownAction.java
index 45908f5b..1ddd674c 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/PeerShutdownAction.java
@@ -25,20 +25,20 @@ import org.slf4j.LoggerFactory;
 /**
  * Action that drops the netty connection to the remote once invoked.
  */
-public class ConnectionDropAction implements ScriptedAction {
+public class PeerShutdownAction implements ScriptedAction {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ConnectionDropAction.class);
+    private static final Logger LOG = LoggerFactory.getLogger(PeerShutdownAction.class);
 
     private final ProtonTestPeer peer;
     private int delay = -1;
 
-    public ConnectionDropAction(ProtonTestPeer peer) {
+    public PeerShutdownAction(ProtonTestPeer peer) {
         this.peer = peer;
     }
 
     @Override
     public ScriptedAction now() {
-        LOG.info("Connection Drop Action closing test peer as scripted");
+        LOG.info("Peer Shutdown Action closing test peer as scripted");
         peer.close();
         return this;
     }
@@ -67,7 +67,7 @@ public class ConnectionDropAction implements ScriptedAction {
 
                 @Override
                 public ScriptedAction perform(AMQPTestDriver driver) {
-                    return ConnectionDropAction.this.now();
+                    return PeerShutdownAction.this.now();
                 }
 
                 @Override
@@ -91,7 +91,7 @@ public class ConnectionDropAction implements ScriptedAction {
         return delay;
     }
 
-    public ConnectionDropAction afterDelay(int delay) {
+    public PeerShutdownAction afterDelay(int delay) {
         this.delay = delay;
         return this;
     }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java
index cfa728d9..39d77db9 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java
@@ -63,16 +63,18 @@ public interface NettyIOBuilder {
      * 		The {@link ProtonTestServerOptions} that configure the IO Transport the context creates.
      * @param connectedHandler
      * 		A handler that should be invoked when a connection attempt succeeds.
+     * @param disconnectedHandler
+     * 		A handler that should be invoked when a connection closed or is dropped.
      * @param inputHandler
      * 		A {@link Consumer} that accept incoming {@link ByteBuffer} data from the remote.
      *
      * @return a new {@link NettyServer} from available options.
      */
-    public static NettyServer createServer(ProtonTestServerOptions options, Runnable connectedHandler, Consumer<ByteBuffer> inputHandler) {
+    public static NettyServer createServer(ProtonTestServerOptions options, Runnable connectedHandler, Runnable disconnectedHandler, Consumer<ByteBuffer> inputHandler) {
         if (Netty4Support.isAvailable()) {
-            return new Netty4Server(options, connectedHandler, inputHandler);
+            return new Netty4Server(options, connectedHandler, disconnectedHandler, inputHandler);
         } else if (Netty5Support.isAvailable()) {
-            return new Netty5Server(options, connectedHandler, inputHandler);
+            return new Netty5Server(options, connectedHandler, disconnectedHandler, inputHandler);
         }
 
         throw new UnsupportedOperationException("Netty not available on the class path");
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
index 4291bee1..99d775c8 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
@@ -23,49 +23,151 @@ import java.nio.ByteBuffer;
 import javax.net.ssl.SSLEngine;
 
 /**
- *
+ * Provides the API for a Netty based server instance used by the test peer.
  */
 public interface NettyServer extends AutoCloseable {
 
-    boolean isSecureServer();
+    /**
+     * Starts the server instance which ends the configuration phase and allows for
+     * incoming connections to be accepted.
+     *
+     * @throws Exception if an error occurs during server start.
+     */
+    void start() throws Exception;
 
-    boolean isAcceptingConnections();
+    /**
+     * Stops the server, waiting on termination of all server resources before
+     * the method returns.
+     *
+     * @throws InterruptedException
+     */
+    void stop() throws InterruptedException;
+
+    /**
+     * Stops the server and returns as soon as server shutdown has been
+     * successfully started but does not wait for all server resources to
+     * be fully shut down.
+     *
+     * @throws InterruptedException
+     */
+    void stopAsync() throws InterruptedException;
+
+    /**
+     * Close API that calls the blocking {@link #stop()} API and awaits the
+     * full server shutdown. Use this in try-with-resources style invocations.
+     */
+    @Override
+    void close() throws InterruptedException;
+
+    /**
+     * Disconnects any connected client and leaves the server in a state where a new
+     * client connection is possible.
+     *
+     * @throws Exception
+     */
+    void disconnectClient() throws Exception;
+
+    /**
+     * @return true if the server is using SSL as the transport layer.
+     */
+    boolean isSecureServer();
 
+    /**
+     * @return true if the server has an active SSL transport connection.
+     */
     boolean hasSecureConnection();
 
+    /**
+     * @return true if the server is in a state where connections will be accepted.
+     */
+    boolean isAcceptingConnections();
+
+    /**
+     * @return true if there is an active connection to the server.
+     */
     boolean hasClientConnection();
 
+    /**
+     * @return the remote port that the client connection is connected to.
+     */
     int getClientPort();
 
+    /**
+     * @return has the SSL handshake for a client completed successfully.
+     */
     boolean isPeerVerified();
 
+    /**
+     * @return the {@link SSLEngine} that was configured and assigned to this server.
+     */
     SSLEngine getConnectionSSLEngine();
 
+    /**
+     * @return is the server exposing a WebSocket endpoint.
+     */
     boolean isWebSocketServer();
 
+    /**
+     * @return the path that was configured on the WebSocket transport.
+     */
     String getWebSocketPath();
 
+    /**
+     * Assign the WebSocket path that server should use, which must be done
+     * prior to starting the server.
+     *
+     * @param webSocketPath
+     * 		The web socket path to use when accepting connections.
+     */
     void setWebSocketPath(String webSocketPath);
 
+    /**
+     * @return the configured max frame size for WebSocket connections.
+     */
     int getMaxFrameSize();
 
+    /**
+     * Assign the max frame size value to configure on WebSocket connection which
+     * must be assigned prior to starting the server.
+     *
+     * @param maxFrameSize
+     * 		The max frame size in bytes to assign to web socket connections.
+     */
     void setMaxFrameSize(int maxFrameSize);
 
+    /**
+     * Creates an connection URI for an AMQP client that provides the proper AMQP
+     * scheme format (amqp:// etc) along with the host and port values plus an optional
+     * query string provided by the caller.
+     *
+     * @param queryString
+     * 		optional query string to append to the returned URI
+     *
+     * @return a URI that a client could use to connect to the AMQP server.
+     *
+     * @throws Exception if an error occurs while creating the connection URI.
+     */
     URI getConnectionURI(String queryString) throws Exception;
 
-    void start() throws Exception;
-
-    void write(ByteBuffer frame);
-
+    /**
+     * Writes the given buffer to the IO layer.
+     *
+     * @param buffer
+     * 		The buffer to send to the remote connection.
+     */
+    void write(ByteBuffer buffer);
+
+    /**
+     * @return the event loop that the server processing runs within.
+     */
     NettyEventLoop eventLoop();
 
-    void stop() throws InterruptedException;
-
-    void stopAsync() throws InterruptedException;
-
-    @Override
-    void close() throws InterruptedException;
-
+    /**
+     * Gets the server port that new connections are accepted on, this should
+     * only be called after the {@link #start()} method has been called..
+     *
+     * @return the server port where connections should be made.
+     */
     int getServerPort();
 
 }
\ No newline at end of file
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
index cf7ba820..64df5b0a 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
@@ -101,15 +101,18 @@ public final class Netty4Server implements NettyServer {
 
     private final Consumer<ByteBuffer> inputConsumer;
     private final Runnable connectedRunnable;
+    private final Runnable disconnectedRunnable;
 
-    public Netty4Server(ProtonTestServerOptions options, Runnable connectedRunnable, Consumer<ByteBuffer> inputConsumer) {
+    public Netty4Server(ProtonTestServerOptions options, Runnable connectedRunnable, Runnable disconnectedRunnable, Consumer<ByteBuffer> inputConsumer) {
         Objects.requireNonNull(options);
         Objects.requireNonNull(inputConsumer);
         Objects.requireNonNull(connectedRunnable);
+        Objects.requireNonNull(disconnectedRunnable);
 
         this.options = options;
         this.connectedRunnable = connectedRunnable;
         this.inputConsumer = inputConsumer;
+        this.disconnectedRunnable = disconnectedRunnable;
     }
 
     @Override
@@ -248,7 +251,10 @@ public final class Netty4Server implements NettyServer {
                 @Override
                 public void initChannel(Channel ch) throws Exception {
                     // Don't accept any new connections.
-                    serverChannel.close();
+                    if (clientChannel != null) {
+                        throw new UnsupportedOperationException("Server cannot have more than one connected client at a time");
+                    }
+
                     // Now we know who the client is
                     clientChannel = ch;
 
@@ -286,6 +292,12 @@ public final class Netty4Server implements NettyServer {
                 ctx.fireChannelActive();
             }
 
+            @Override
+            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+                disconnectedRunnable.run();
+                ctx.fireChannelInactive();
+            }
+
             @Override
             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
                 LOG.trace("AMQP Test Server Channel read: {}", input);
@@ -341,6 +353,26 @@ public final class Netty4Server implements NettyServer {
         }
     }
 
+    @Override
+    public void disconnectClient() throws Exception {
+        if (!started.get() || !serverChannel.isOpen()) {
+            throw new IllegalStateException("Server must be currently active in order to reset");
+        }
+
+        if (clientChannel != null) {
+            try {
+                if (!clientChannel.close().await(10, TimeUnit.SECONDS)) {
+                    LOG.info("Connected Client channel close timed out waiting for result");
+                }
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                LOG.debug("Close of connected client channel interrupted while awaiting result");
+            } finally {
+                clientChannel = null;
+            }
+        }
+    }
+
     @Override
     public void stop() throws InterruptedException {
         if (started.compareAndSet(true, false)) {
@@ -448,6 +480,7 @@ public final class Netty4Server implements NettyServer {
             LOG.info("NettyServerHandler: channel has gone inactive: {}", ctx.channel());
             ctx.close();
             ctx.fireChannelInactive();
+            Netty4Server.this.clientChannel = null;
         }
 
         @Override
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
index 11cf9652..0148b091 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
@@ -100,14 +100,16 @@ public final class Netty5Server implements NettyServer {
 
     private final Consumer<ByteBuffer> inputConsumer;
     private final Runnable connectedRunnable;
+    private final Runnable disconnectedRunnable;
 
-    public Netty5Server(ProtonTestServerOptions options, Runnable connectedRunnable, Consumer<ByteBuffer> inputConsumer) {
+    public Netty5Server(ProtonTestServerOptions options, Runnable connectedRunnable, Runnable disconnectedRunnable, Consumer<ByteBuffer> inputConsumer) {
         Objects.requireNonNull(options);
         Objects.requireNonNull(inputConsumer);
         Objects.requireNonNull(connectedRunnable);
 
         this.options = options;
         this.connectedRunnable = connectedRunnable;
+        this.disconnectedRunnable = disconnectedRunnable;
         this.inputConsumer = inputConsumer;
     }
 
@@ -284,6 +286,12 @@ public final class Netty5Server implements NettyServer {
                 ctx.fireChannelActive();
             }
 
+            @Override
+            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+                disconnectedRunnable.run();
+                ctx.fireChannelInactive();
+            }
+
             @Override
             protected void messageReceived(ChannelHandlerContext ctx, Buffer input) throws Exception {
                 LOG.trace("AMQP Test Server Channel read: {}", input);
@@ -373,6 +381,26 @@ public final class Netty5Server implements NettyServer {
         stop();
     }
 
+    @Override
+    public void disconnectClient() throws Exception {
+        if (!started.get() || !serverChannel.isOpen()) {
+            throw new IllegalStateException("Server must be currently active in order to reset");
+        }
+
+        if (clientChannel != null) {
+            try {
+                if (!clientChannel.close().asStage().await(10, TimeUnit.SECONDS)) {
+                    LOG.info("Connected Client channel close timed out waiting for result");
+                }
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                LOG.debug("Close of connected client channel interrupted while awaiting result");
+            } finally {
+                clientChannel = null;
+            }
+        }
+    }
+
     @Override
     public int getServerPort() {
         if (!started.get()) {
@@ -446,6 +474,7 @@ public final class Netty5Server implements NettyServer {
             LOG.info("NettyServerHandler: channel has gone inactive: {}", ctx.channel());
             ctx.close();
             ctx.fireChannelInactive();
+            Netty5Server.this.clientChannel = null;
         }
 
         @Override
diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
index 47ab62e6..3027fe91 100644
--- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
+++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
@@ -58,6 +58,37 @@ class ProtonTestClientTest extends TestPeerTestsBase {
         }
     }
 
+    @Test
+    public void testTwoClientConnectionsAndExchangeAMQPHeaders() throws Exception {
+        try (ProtonTestServer peer = new ProtonTestServer()) {
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.expectAMQPHeader().respondWithAMQPHeader();
+            peer.start();
+
+            URI remoteURI = peer.getServerURI();
+
+            // Server can accept two connection, although not at the same time.
+
+            try (ProtonTestClient client = new ProtonTestClient()) {
+                client.connect(remoteURI.getHost(), remoteURI.getPort());
+                client.expectAMQPHeader();
+                client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+                client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            }
+
+            try (ProtonTestClient client = new ProtonTestClient()) {
+                client.connect(remoteURI.getHost(), remoteURI.getPort());
+                client.expectAMQPHeader();
+                client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+                client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            }
+
+            LOG.info("Test started, peer listening on: {}", remoteURI);
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+        }
+    }
+
     @Test
     public void testClientDetectsUnexpectedPerformativeResponseToAMQPHeader() throws Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org