You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/03/30 22:48:54 UTC
[qpid-protonj2] branch main updated: PROTON-2702 Allow the server to accept a new connection
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new f5cdc0d3 PROTON-2702 Allow the server to accept a new connection
f5cdc0d3 is described below
commit f5cdc0d3329d47bd38d5436bedcc73f0d1f8dfaf
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Thu Mar 30 18:48:42 2023 -0400
PROTON-2702 Allow the server to accept a new connection
If the active connection drops or is disconnected allow the server to be
scripted to accept a new client connection.
---
.../qpid/protonj2/client/impl/ReconnectTest.java | 4 +-
.../qpid/protonj2/test/driver/AMQPTestDriver.java | 15 ++-
.../protonj2/test/driver/ProtonTestClient.java | 19 ++-
.../protonj2/test/driver/ProtonTestConnector.java | 12 +-
.../qpid/protonj2/test/driver/ProtonTestPeer.java | 96 +++++++++++++--
.../protonj2/test/driver/ProtonTestServer.java | 58 ++++++++-
.../test/driver/actions/ConnectionDropAction.java | 4 +-
...tionDropAction.java => PeerShutdownAction.java} | 12 +-
.../protonj2/test/driver/netty/NettyIOBuilder.java | 8 +-
.../protonj2/test/driver/netty/NettyServer.java | 130 ++++++++++++++++++---
.../test/driver/netty/netty4/Netty4Server.java | 37 +++++-
.../test/driver/netty/netty5/Netty5Server.java | 31 ++++-
.../protonj2/test/driver/ProtonTestClientTest.java | 31 +++++
13 files changed, 406 insertions(+), 51 deletions(-)
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java
index 84adf200..28cf7789 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/ReconnectTest.java
@@ -57,7 +57,7 @@ public class ReconnectTest extends ImperativeClientTestCase {
firstPeer.expectSASLAnonymousConnect();
firstPeer.expectOpen().withContainerId(any(String.class)).respond();
- firstPeer.dropAfterLastHandler(5);
+ firstPeer.shutdownAfterLastHandler(5);
firstPeer.start();
finalPeer.expectSASLAnonymousConnect();
@@ -101,7 +101,7 @@ public class ReconnectTest extends ImperativeClientTestCase {
finalPeer.waitForScriptToComplete();
finalPeer.expectBegin().respond();
finalPeer.expectEnd().respond();
- finalPeer.dropAfterLastHandler(10);
+ finalPeer.shutdownAfterLastHandler(10);
Session session = connection.openSession().openFuture().get();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index a277de7a..9403e01d 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -253,6 +253,15 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
}
}
+ void handleConnectedDropped() throws AssertionError {
+ synchronized (script) {
+ // For now we just reset the parse as any new connection would need to
+ // send an AMQP header, other validation could be added if we expand
+ // processing on client disconnect events.
+ frameParser.resetToExpectingHeader();
+ }
+ }
+
void handleHeader(AMQPHeader header) throws AssertionError {
synchronized (script) {
final ScriptedElement scriptEntry = script.poll();
@@ -590,7 +599,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
final ByteBuffer output;
final ByteBuffer buffer = frameEncoder.handleWrite(performative, channel, payload, null);
- if (deferredWrites != null) {
+ if (!deferredWrites.isEmpty()) {
deferredWrites.add(buffer);
try {
output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
@@ -644,7 +653,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
try {
final ByteBuffer buffer = frameEncoder.handleWrite(performative, channel);
- if (deferredWrites != null) {
+ if (!deferredWrites.isEmpty()) {
deferredWrites.add(buffer);
try {
output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
@@ -674,7 +683,7 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
try {
final ByteBuffer output;
- if (deferredWrites != null) {
+ if (!deferredWrites.isEmpty()) {
LOG.trace("{} appending deferred buffer {} to next write.", driverName, deferredWrites);
deferredWrites.add(ByteBuffer.wrap(header.getBuffer()).asReadOnlyBuffer());
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index b9eade1d..97c59e28 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -23,9 +23,9 @@ import java.util.function.Supplier;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
-import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
import org.apache.qpid.protonj2.test.driver.netty.NettyClient;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
+import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,8 +63,8 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
this::processDriverAssertion,
this::eventLoop);
this.client = NettyIOBuilder.createClient(options,
- this::processConnectionEstablished,
- this::processChannelInput);
+ this::processConnectionEstablished,
+ this::processChannelInput);
}
public void connect(String hostname, int port) throws IOException {
@@ -83,7 +83,13 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
}
@Override
- protected void processCloseRequest() {
+ protected void processConnectionDropped() {
+ LOG.trace("AMQP Client connection to remote dropped.");
+ driver.handleConnectedDropped();
+ }
+
+ @Override
+ protected void processCloseConnectionRequest() {
try {
client.close();
} catch (Throwable e) {
@@ -91,6 +97,11 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
}
}
+ @Override
+ protected void processPeerShutdownRequest() {
+ processCloseConnectionRequest(); // Same outcome as the client is single use only.
+ }
+
@Override
protected void processDriverOutput(ByteBuffer frame) {
LOG.trace("AMQP Client Channel writing: {}", frame);
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
index 5375fe3f..fc577554 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
@@ -64,7 +64,12 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte
//----- Internal implementation which can be overridden
@Override
- protected void processCloseRequest() {
+ protected void processCloseConnectionRequest() {
+ // nothing to do in this peer implementation.
+ }
+
+ @Override
+ protected void processPeerShutdownRequest() {
// nothing to do in this peer implementation.
}
@@ -77,4 +82,9 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte
protected void processConnectionEstablished() {
driver.handleConnectedEstablished();
}
+
+ @Override
+ protected void processConnectionDropped() {
+ driver.handleConnectedDropped();
+ }
}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
index 6dc4d1af..78d06448 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.protonj2.test.driver.actions.ConnectionDropAction;
+import org.apache.qpid.protonj2.test.driver.actions.PeerShutdownAction;
/**
* Abstract base class that is implemented by all the AMQP v1.0 test peer
@@ -38,43 +39,91 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
- processCloseRequest();
+ processPeerShutdownRequest();
}
}
+ /**
+ * Disconnect any current active connection to this peer, or from this peer if it
+ * is a client type connection. Depending on the peer implementation this could be
+ * a terminal action. A server implementation should allow for a new connection to
+ * be made to it following dropping any existing client connection.
+ */
+ public void dropConnection() {
+ if (closed.compareAndSet(false, true)) {
+ processCloseConnectionRequest();
+ }
+ }
+
+ /**
+ * Waits for the test script to complete and ignores any errors seen. This method is
+ * generally only useful for scripts that complete other specific waits and then want
+ * to wait for a shutdown sequence without scripting an exact order of events.
+ */
public void waitForScriptToCompleteIgnoreErrors() {
getDriver().waitForScriptToCompleteIgnoreErrors();
}
+ /**
+ * Waits for the configured test script to complete and will throw any errors
+ * from the expected script as {@link AssertionError} exceptions.
+ */
public void waitForScriptToComplete() {
getDriver().waitForScriptToComplete();
}
+ /**
+ * Waits for the configured test script to complete and will throw any errors
+ * from the expected script as {@link AssertionError} exceptions.
+ *
+ * @param timeout
+ * The time to wait in milliseconds before failing the wait.
+ */
public void waitForScriptToComplete(long timeout) {
getDriver().waitForScriptToComplete(timeout);
}
+ /**
+ * Waits for the configured test script to complete and will throw any errors
+ * from the expected script as {@link AssertionError} exceptions.
+ *
+ * @param timeout
+ * The time to wait before failing the wait.
+ * @param units
+ * The units to use for the given time interval.
+ */
public void waitForScriptToComplete(long timeout, TimeUnit units) {
getDriver().waitForScriptToComplete(timeout, units);
}
+ /**
+ * @return the total number of AMQP idle frames that were received.
+ */
public int getEmptyFrameCount() {
return getDriver().getEmptyFrameCount();
}
+ /**
+ * @return the total number of AMQP performatives that were read so far.
+ */
public int getPerformativeCount() {
return getDriver().getPerformativeCount();
}
+ /**
+ * @return the total number of SASL performatives that were read so far.
+ */
public int getSaslPerformativeCount() {
return getDriver().getSaslPerformativeCount();
}
/**
* Drops the connection to the connected client immediately after the last handler that was
- * registered before this scripted action is queued. Adding any additional test scripting to
- * the test driver will either not be acted on or could cause the wait methods to not return
- * as they will never be invoked.
+ * registered before this scripted action is queued. Depending on the test peer this action
+ * could be a terminal one meaning no other scripted elements could be added and doing so might
+ * result in wait for completion calls to hang. In general any server type peer should allow
+ * reconnects after dropping a client connection but a client peer would be expected not to
+ * allow and action following a drop.
*
* @return this test peer instance.
*/
@@ -85,9 +134,11 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
/**
* Drops the connection to the connected client immediately after the last handler that was
- * registered before this scripted action is queued. Adding any additional test scripting to
- * the test driver will either not be acted on or could cause the wait methods to not return
- * as they will never be invoked.
+ * registered before this scripted action is queued. Depending on the test peer this action
+ * could be a terminal one meaning no other scripted elements could be added and doing so might
+ * result in wait for completion calls to hang. In general any server type peer should allow
+ * reconnects after dropping a client connection but a client peer would be expected not to
+ * allow and action following a drop.
*
* @param delay
* The time in milliseconds to wait before running the action after the last handler is run.
@@ -99,14 +150,43 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
return this;
}
+ /**
+ * Shuts down the peer dropping and connections and rendering the peer not usable for
+ * any new actions to be added.
+ *
+ * @return this test peer instance.
+ */
+ public ProtonTestPeer shutdownAfterLastHandler() {
+ getDriver().addScriptedElement(new PeerShutdownAction(this));
+ return this;
+ }
+
+ /**
+ * Shuts down the peer dropping and connections and rendering the peer not usable for
+ * any new actions to be added.
+ *
+ * @param delay
+ * The time in milliseconds to wait before running the action after the last handler is run.
+ *
+ * @return this test peer instance.
+ */
+ public ProtonTestPeer shutdownAfterLastHandler(int delay) {
+ getDriver().addScriptedElement(new PeerShutdownAction(this).afterDelay(delay));
+ return this;
+ }
+
protected abstract String getPeerName();
- protected abstract void processCloseRequest();
+ protected abstract void processCloseConnectionRequest();
+
+ protected abstract void processPeerShutdownRequest();
protected abstract void processDriverOutput(ByteBuffer frame);
protected abstract void processConnectionEstablished();
+ protected abstract void processConnectionDropped();
+
protected void checkClosed() {
if (closed.get()) {
throw new IllegalStateException("The test peer is closed");
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index ad2c2884..b4ab29ce 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -23,10 +23,11 @@ import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
+import org.apache.qpid.protonj2.test.driver.actions.ConnectionDropAction;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
-import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
+import org.apache.qpid.protonj2.test.driver.netty.NettyIOBuilder;
import org.apache.qpid.protonj2.test.driver.netty.NettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,10 +62,13 @@ public class ProtonTestServer extends ProtonTestPeer {
* The options that control the behavior of the deployed server.
*/
public ProtonTestServer(ProtonTestServerOptions options) {
- this.driver = new NettyAwareAMQPTestDriver(this::processDriverOutput, this::processDriverAssertion, this::eventLoop);
+ this.driver = new NettyAwareAMQPTestDriver(this::processDriverOutput,
+ this::processDriverAssertion,
+ this::eventLoop);
this.server = NettyIOBuilder.createServer(options,
- this::processConnectionEstablished,
- this::processChannelInput);
+ this::processConnectionEstablished,
+ this::processConnectionDropped,
+ this::processChannelInput);
}
/**
@@ -119,6 +123,35 @@ public class ProtonTestServer extends ProtonTestPeer {
}
}
+ /**
+ * Drops the connection to the connected client immediately after the last handler that was
+ * registered before this scripted action is queued. Adding any additional test scripting to
+ * the test driver will either not be acted on or could cause the wait methods to not return
+ * as they will never be invoked.
+ *
+ * @return this test peer instance.
+ */
+ public ProtonTestPeer dropAfterLastHandler() {
+ getDriver().addScriptedElement(new ConnectionDropAction(this));
+ return this;
+ }
+
+ /**
+ * Drops the connection to the connected client immediately after the last handler that was
+ * registered before this scripted action is queued. Adding any additional test scripting to
+ * the test driver will either not be acted on or could cause the wait methods to not return
+ * as they will never be invoked.
+ *
+ * @param delay
+ * The time in milliseconds to wait before running the action after the last handler is run.
+ *
+ * @return this test peer instance.
+ */
+ public ProtonTestPeer dropAfterLastHandler(int delay) {
+ getDriver().addScriptedElement(new ConnectionDropAction(this).afterDelay(delay));
+ return this;
+ }
+
public boolean isAcceptingConnections() {
return server.isAcceptingConnections();
}
@@ -250,7 +283,7 @@ public class ProtonTestServer extends ProtonTestPeer {
//----- Internal implementation which can be overridden
@Override
- protected void processCloseRequest() {
+ protected void processPeerShutdownRequest() {
try {
server.stopAsync();
} catch (Throwable e) {
@@ -258,6 +291,15 @@ public class ProtonTestServer extends ProtonTestPeer {
}
}
+ @Override
+ protected void processCloseConnectionRequest() {
+ try {
+ server.disconnectClient();
+ } catch (Throwable e) {
+ LOG.info("Error suppressed on server dropping connected client: ", e);
+ }
+ }
+
@Override
protected void processDriverOutput(ByteBuffer frame) {
LOG.trace("AMQP Server Channel writing: {}", frame);
@@ -270,6 +312,12 @@ public class ProtonTestServer extends ProtonTestPeer {
driver.handleConnectedEstablished();
}
+ @Override
+ protected void processConnectionDropped() {
+ LOG.trace("AMQP Server reports client connection dropped.");
+ driver.handleConnectedDropped();
+ }
+
protected void processDriverAssertion(AssertionError error) {
LOG.trace("AMQP Server Closing due to error: {}", error.getMessage());
close();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
index 45908f5b..7d08da65 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
@@ -38,8 +38,8 @@ public class ConnectionDropAction implements ScriptedAction {
@Override
public ScriptedAction now() {
- LOG.info("Connection Drop Action closing test peer as scripted");
- peer.close();
+ LOG.info("Connection Drop Action closing test peer connection as scripted");
+ peer.dropConnection();
return this;
}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/PeerShutdownAction.java
similarity index 86%
copy from protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
copy to protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/PeerShutdownAction.java
index 45908f5b..1ddd674c 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/ConnectionDropAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/PeerShutdownAction.java
@@ -25,20 +25,20 @@ import org.slf4j.LoggerFactory;
/**
* Action that drops the netty connection to the remote once invoked.
*/
-public class ConnectionDropAction implements ScriptedAction {
+public class PeerShutdownAction implements ScriptedAction {
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionDropAction.class);
+ private static final Logger LOG = LoggerFactory.getLogger(PeerShutdownAction.class);
private final ProtonTestPeer peer;
private int delay = -1;
- public ConnectionDropAction(ProtonTestPeer peer) {
+ public PeerShutdownAction(ProtonTestPeer peer) {
this.peer = peer;
}
@Override
public ScriptedAction now() {
- LOG.info("Connection Drop Action closing test peer as scripted");
+ LOG.info("Peer Shutdown Action closing test peer as scripted");
peer.close();
return this;
}
@@ -67,7 +67,7 @@ public class ConnectionDropAction implements ScriptedAction {
@Override
public ScriptedAction perform(AMQPTestDriver driver) {
- return ConnectionDropAction.this.now();
+ return PeerShutdownAction.this.now();
}
@Override
@@ -91,7 +91,7 @@ public class ConnectionDropAction implements ScriptedAction {
return delay;
}
- public ConnectionDropAction afterDelay(int delay) {
+ public PeerShutdownAction afterDelay(int delay) {
this.delay = delay;
return this;
}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java
index cfa728d9..39d77db9 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyIOBuilder.java
@@ -63,16 +63,18 @@ public interface NettyIOBuilder {
* The {@link ProtonTestServerOptions} that configure the IO Transport the context creates.
* @param connectedHandler
* A handler that should be invoked when a connection attempt succeeds.
+ * @param disconnectedHandler
+ * A handler that should be invoked when a connection closed or is dropped.
* @param inputHandler
* A {@link Consumer} that accept incoming {@link ByteBuffer} data from the remote.
*
* @return a new {@link NettyServer} from available options.
*/
- public static NettyServer createServer(ProtonTestServerOptions options, Runnable connectedHandler, Consumer<ByteBuffer> inputHandler) {
+ public static NettyServer createServer(ProtonTestServerOptions options, Runnable connectedHandler, Runnable disconnectedHandler, Consumer<ByteBuffer> inputHandler) {
if (Netty4Support.isAvailable()) {
- return new Netty4Server(options, connectedHandler, inputHandler);
+ return new Netty4Server(options, connectedHandler, disconnectedHandler, inputHandler);
} else if (Netty5Support.isAvailable()) {
- return new Netty5Server(options, connectedHandler, inputHandler);
+ return new Netty5Server(options, connectedHandler, disconnectedHandler, inputHandler);
}
throw new UnsupportedOperationException("Netty not available on the class path");
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
index 4291bee1..99d775c8 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/NettyServer.java
@@ -23,49 +23,151 @@ import java.nio.ByteBuffer;
import javax.net.ssl.SSLEngine;
/**
- *
+ * Provides the API for a Netty based server instance used by the test peer.
*/
public interface NettyServer extends AutoCloseable {
- boolean isSecureServer();
+ /**
+ * Starts the server instance which ends the configuration phase and allows for
+ * incoming connections to be accepted.
+ *
+ * @throws Exception if an error occurs during server start.
+ */
+ void start() throws Exception;
- boolean isAcceptingConnections();
+ /**
+ * Stops the server, waiting on termination of all server resources before
+ * the method returns.
+ *
+ * @throws InterruptedException
+ */
+ void stop() throws InterruptedException;
+
+ /**
+ * Stops the server and returns as soon as server shutdown has been
+ * successfully started but does not wait for all server resources to
+ * be fully shut down.
+ *
+ * @throws InterruptedException
+ */
+ void stopAsync() throws InterruptedException;
+
+ /**
+ * Close API that calls the blocking {@link #stop()} API and awaits the
+ * full server shutdown. Use this in try-with-resources style invocations.
+ */
+ @Override
+ void close() throws InterruptedException;
+
+ /**
+ * Disconnects any connected client and leaves the server in a state where a new
+ * client connection is possible.
+ *
+ * @throws Exception
+ */
+ void disconnectClient() throws Exception;
+
+ /**
+ * @return true if the server is using SSL as the transport layer.
+ */
+ boolean isSecureServer();
+ /**
+ * @return true if the server has an active SSL transport connection.
+ */
boolean hasSecureConnection();
+ /**
+ * @return true if the server is in a state where connections will be accepted.
+ */
+ boolean isAcceptingConnections();
+
+ /**
+ * @return true if there is an active connection to the server.
+ */
boolean hasClientConnection();
+ /**
+ * @return the remote port that the client connection is connected to.
+ */
int getClientPort();
+ /**
+ * @return has the SSL handshake for a client completed successfully.
+ */
boolean isPeerVerified();
+ /**
+ * @return the {@link SSLEngine} that was configured and assigned to this server.
+ */
SSLEngine getConnectionSSLEngine();
+ /**
+ * @return is the server exposing a WebSocket endpoint.
+ */
boolean isWebSocketServer();
+ /**
+ * @return the path that was configured on the WebSocket transport.
+ */
String getWebSocketPath();
+ /**
+ * Assign the WebSocket path that server should use, which must be done
+ * prior to starting the server.
+ *
+ * @param webSocketPath
+ * The web socket path to use when accepting connections.
+ */
void setWebSocketPath(String webSocketPath);
+ /**
+ * @return the configured max frame size for WebSocket connections.
+ */
int getMaxFrameSize();
+ /**
+ * Assign the max frame size value to configure on WebSocket connection which
+ * must be assigned prior to starting the server.
+ *
+ * @param maxFrameSize
+ * The max frame size in bytes to assign to web socket connections.
+ */
void setMaxFrameSize(int maxFrameSize);
+ /**
+ * Creates an connection URI for an AMQP client that provides the proper AMQP
+ * scheme format (amqp:// etc) along with the host and port values plus an optional
+ * query string provided by the caller.
+ *
+ * @param queryString
+ * optional query string to append to the returned URI
+ *
+ * @return a URI that a client could use to connect to the AMQP server.
+ *
+ * @throws Exception if an error occurs while creating the connection URI.
+ */
URI getConnectionURI(String queryString) throws Exception;
- void start() throws Exception;
-
- void write(ByteBuffer frame);
-
+ /**
+ * Writes the given buffer to the IO layer.
+ *
+ * @param buffer
+ * The buffer to send to the remote connection.
+ */
+ void write(ByteBuffer buffer);
+
+ /**
+ * @return the event loop that the server processing runs within.
+ */
NettyEventLoop eventLoop();
- void stop() throws InterruptedException;
-
- void stopAsync() throws InterruptedException;
-
- @Override
- void close() throws InterruptedException;
-
+ /**
+ * Gets the server port that new connections are accepted on, this should
+ * only be called after the {@link #start()} method has been called..
+ *
+ * @return the server port where connections should be made.
+ */
int getServerPort();
}
\ No newline at end of file
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
index cf7ba820..64df5b0a 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty4/Netty4Server.java
@@ -101,15 +101,18 @@ public final class Netty4Server implements NettyServer {
private final Consumer<ByteBuffer> inputConsumer;
private final Runnable connectedRunnable;
+ private final Runnable disconnectedRunnable;
- public Netty4Server(ProtonTestServerOptions options, Runnable connectedRunnable, Consumer<ByteBuffer> inputConsumer) {
+ public Netty4Server(ProtonTestServerOptions options, Runnable connectedRunnable, Runnable disconnectedRunnable, Consumer<ByteBuffer> inputConsumer) {
Objects.requireNonNull(options);
Objects.requireNonNull(inputConsumer);
Objects.requireNonNull(connectedRunnable);
+ Objects.requireNonNull(disconnectedRunnable);
this.options = options;
this.connectedRunnable = connectedRunnable;
this.inputConsumer = inputConsumer;
+ this.disconnectedRunnable = disconnectedRunnable;
}
@Override
@@ -248,7 +251,10 @@ public final class Netty4Server implements NettyServer {
@Override
public void initChannel(Channel ch) throws Exception {
// Don't accept any new connections.
- serverChannel.close();
+ if (clientChannel != null) {
+ throw new UnsupportedOperationException("Server cannot have more than one connected client at a time");
+ }
+
// Now we know who the client is
clientChannel = ch;
@@ -286,6 +292,12 @@ public final class Netty4Server implements NettyServer {
ctx.fireChannelActive();
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ disconnectedRunnable.run();
+ ctx.fireChannelInactive();
+ }
+
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
LOG.trace("AMQP Test Server Channel read: {}", input);
@@ -341,6 +353,26 @@ public final class Netty4Server implements NettyServer {
}
}
+ @Override
+ public void disconnectClient() throws Exception {
+ if (!started.get() || !serverChannel.isOpen()) {
+ throw new IllegalStateException("Server must be currently active in order to reset");
+ }
+
+ if (clientChannel != null) {
+ try {
+ if (!clientChannel.close().await(10, TimeUnit.SECONDS)) {
+ LOG.info("Connected Client channel close timed out waiting for result");
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ LOG.debug("Close of connected client channel interrupted while awaiting result");
+ } finally {
+ clientChannel = null;
+ }
+ }
+ }
+
@Override
public void stop() throws InterruptedException {
if (started.compareAndSet(true, false)) {
@@ -448,6 +480,7 @@ public final class Netty4Server implements NettyServer {
LOG.info("NettyServerHandler: channel has gone inactive: {}", ctx.channel());
ctx.close();
ctx.fireChannelInactive();
+ Netty4Server.this.clientChannel = null;
}
@Override
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
index 11cf9652..0148b091 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/netty/netty5/Netty5Server.java
@@ -100,14 +100,16 @@ public final class Netty5Server implements NettyServer {
private final Consumer<ByteBuffer> inputConsumer;
private final Runnable connectedRunnable;
+ private final Runnable disconnectedRunnable;
- public Netty5Server(ProtonTestServerOptions options, Runnable connectedRunnable, Consumer<ByteBuffer> inputConsumer) {
+ public Netty5Server(ProtonTestServerOptions options, Runnable connectedRunnable, Runnable disconnectedRunnable, Consumer<ByteBuffer> inputConsumer) {
Objects.requireNonNull(options);
Objects.requireNonNull(inputConsumer);
Objects.requireNonNull(connectedRunnable);
this.options = options;
this.connectedRunnable = connectedRunnable;
+ this.disconnectedRunnable = disconnectedRunnable;
this.inputConsumer = inputConsumer;
}
@@ -284,6 +286,12 @@ public final class Netty5Server implements NettyServer {
ctx.fireChannelActive();
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ disconnectedRunnable.run();
+ ctx.fireChannelInactive();
+ }
+
@Override
protected void messageReceived(ChannelHandlerContext ctx, Buffer input) throws Exception {
LOG.trace("AMQP Test Server Channel read: {}", input);
@@ -373,6 +381,26 @@ public final class Netty5Server implements NettyServer {
stop();
}
+ @Override
+ public void disconnectClient() throws Exception {
+ if (!started.get() || !serverChannel.isOpen()) {
+ throw new IllegalStateException("Server must be currently active in order to reset");
+ }
+
+ if (clientChannel != null) {
+ try {
+ if (!clientChannel.close().asStage().await(10, TimeUnit.SECONDS)) {
+ LOG.info("Connected Client channel close timed out waiting for result");
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ LOG.debug("Close of connected client channel interrupted while awaiting result");
+ } finally {
+ clientChannel = null;
+ }
+ }
+ }
+
@Override
public int getServerPort() {
if (!started.get()) {
@@ -446,6 +474,7 @@ public final class Netty5Server implements NettyServer {
LOG.info("NettyServerHandler: channel has gone inactive: {}", ctx.channel());
ctx.close();
ctx.fireChannelInactive();
+ Netty5Server.this.clientChannel = null;
}
@Override
diff --git a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
index 47ab62e6..3027fe91 100644
--- a/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
+++ b/protonj2-test-driver/src/test/java/org/apache/qpid/protonj2/test/driver/ProtonTestClientTest.java
@@ -58,6 +58,37 @@ class ProtonTestClientTest extends TestPeerTestsBase {
}
}
+ @Test
+ public void testTwoClientConnectionsAndExchangeAMQPHeaders() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.expectAMQPHeader().respondWithAMQPHeader();
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ // Server can accept two connection, although not at the same time.
+
+ try (ProtonTestClient client = new ProtonTestClient()) {
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ try (ProtonTestClient client = new ProtonTestClient()) {
+ client.connect(remoteURI.getHost(), remoteURI.getPort());
+ client.expectAMQPHeader();
+ client.remoteHeader(AMQPHeader.getAMQPHeader()).now();
+ client.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
@Test
public void testClientDetectsUnexpectedPerformativeResponseToAMQPHeader() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org