You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2021/06/02 17:22:07 UTC
[qpid-protonj2] 02/02: PROTON-2393 Allow AMQP Frames to be fired on
connection established
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
commit 34170e03cc2f8ba807d22bdc7d2402d31c16b76a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Fri May 28 13:41:15 2021 -0400
PROTON-2393 Allow AMQP Frames to be fired on connection established
Allows for actions to be queued for fire on connect either from server
accepting new client connection or client connecting successfully to a
server.
---
.../apache/qpid/protonj2/test/driver/AMQPTestDriver.java | 9 +++++++++
.../apache/qpid/protonj2/test/driver/ProtonTestClient.java | 14 +++++++++++++-
.../qpid/protonj2/test/driver/ProtonTestConnector.java | 5 +++++
.../apache/qpid/protonj2/test/driver/ProtonTestPeer.java | 2 ++
.../apache/qpid/protonj2/test/driver/ProtonTestServer.java | 12 ++++++++++++
5 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index bd95058..0540588 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -247,6 +247,15 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
//----- Test driver handling of decoded AMQP frames
+ void handleConnectedEstablished() throws AssertionError {
+ synchronized (script) {
+ ScriptedElement peekNext = script.peek();
+ if (peekNext instanceof ScriptedAction) {
+ prcessScript(peekNext);
+ }
+ }
+ }
+
void handleHeader(AMQPHeader header) throws AssertionError {
synchronized (script) {
final ScriptedElement scriptEntry = script.poll();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index 9541f4b..324119f 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -79,6 +79,12 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
}
@Override
+ protected void processConnectionEstablished() {
+ LOG.trace("AMQP Client connected to remote.");
+ driver.handleConnectedEstablished();
+ }
+
+ @Override
protected void processCloseRequest() {
try {
client.close();
@@ -89,7 +95,7 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
@Override
protected void processDriverOutput(ByteBuffer frame) {
- LOG.trace("AMQP Server Channel writing: {}", frame);
+ LOG.trace("AMQP Client Channel writing: {}", frame);
client.write(frame);
}
@@ -183,6 +189,12 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
return new SimpleChannelInboundHandler<ByteBuf>() {
@Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ processConnectionEstablished();
+ ctx.fireChannelActive();
+ }
+
+ @Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
LOG.trace("AMQP Test Client Channel read: {}", input);
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
index c85dd50..5375fe3 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
@@ -72,4 +72,9 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte
protected void processDriverOutput(ByteBuffer frame) {
inputConsumer.accept(frame);
}
+
+ @Override
+ protected void processConnectionEstablished() {
+ driver.handleConnectedEstablished();
+ }
}
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
index 4a1ac4e..6dc4d1a 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
@@ -105,6 +105,8 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
protected abstract void processDriverOutput(ByteBuffer frame);
+ protected abstract void processConnectionEstablished();
+
protected void checkClosed() {
if (closed.get()) {
throw new IllegalStateException("The test peer is closed");
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index dfdfff0..61e42cc 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -131,6 +131,12 @@ public class ProtonTestServer extends ProtonTestPeer {
return new SimpleChannelInboundHandler<ByteBuf>() {
@Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ processConnectionEstablished();
+ ctx.fireChannelActive();
+ }
+
+ @Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
LOG.trace("AMQP Test Server Channel read: {}", input);
@@ -231,6 +237,12 @@ public class ProtonTestServer extends ProtonTestPeer {
server.write(frame);
}
+ @Override
+ protected void processConnectionEstablished() {
+ LOG.trace("AMQP Server has a client connected.");
+ driver.handleConnectedEstablished();
+ }
+
protected void processDriverAssertion(AssertionError error) {
LOG.trace("AMQP Server Closing due to error: {}", error.getMessage());
close();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org