You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2021/06/02 17:22:07 UTC

[qpid-protonj2] 02/02: PROTON-2393 Allow AMQP Frames to be fired on connection established

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git

commit 34170e03cc2f8ba807d22bdc7d2402d31c16b76a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Fri May 28 13:41:15 2021 -0400

    PROTON-2393 Allow AMQP Frames to be fired on connection established
    
    Allows for actions to be queued for fire on connect either from server
    accepting new client connection or client connecting successfully to a
    server.
---
 .../apache/qpid/protonj2/test/driver/AMQPTestDriver.java   |  9 +++++++++
 .../apache/qpid/protonj2/test/driver/ProtonTestClient.java | 14 +++++++++++++-
 .../qpid/protonj2/test/driver/ProtonTestConnector.java     |  5 +++++
 .../apache/qpid/protonj2/test/driver/ProtonTestPeer.java   |  2 ++
 .../apache/qpid/protonj2/test/driver/ProtonTestServer.java | 12 ++++++++++++
 5 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index bd95058..0540588 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -247,6 +247,15 @@ public class AMQPTestDriver implements Consumer<ByteBuffer> {
 
     //----- Test driver handling of decoded AMQP frames
 
+    void handleConnectedEstablished() throws AssertionError {
+        synchronized (script) {
+            ScriptedElement peekNext = script.peek();
+            if (peekNext instanceof ScriptedAction) {
+                prcessScript(peekNext);
+            }
+        }
+    }
+
     void handleHeader(AMQPHeader header) throws AssertionError {
         synchronized (script) {
             final ScriptedElement scriptEntry = script.poll();
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
index 9541f4b..324119f 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestClient.java
@@ -79,6 +79,12 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
     }
 
     @Override
+    protected void processConnectionEstablished() {
+        LOG.trace("AMQP Client connected to remote.");
+        driver.handleConnectedEstablished();
+    }
+
+    @Override
     protected void processCloseRequest() {
         try {
             client.close();
@@ -89,7 +95,7 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
 
     @Override
     protected void processDriverOutput(ByteBuffer frame) {
-        LOG.trace("AMQP Server Channel writing: {}", frame);
+        LOG.trace("AMQP Client Channel writing: {}", frame);
         client.write(frame);
     }
 
@@ -183,6 +189,12 @@ public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
             return new SimpleChannelInboundHandler<ByteBuf>() {
 
                 @Override
+                public void channelActive(ChannelHandlerContext ctx) throws Exception {
+                    processConnectionEstablished();
+                    ctx.fireChannelActive();
+                }
+
+                @Override
                 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
                     LOG.trace("AMQP Test Client Channel read: {}", input);
 
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
index c85dd50..5375fe3 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestConnector.java
@@ -72,4 +72,9 @@ public class ProtonTestConnector extends ProtonTestPeer implements Consumer<Byte
     protected void processDriverOutput(ByteBuffer frame) {
         inputConsumer.accept(frame);
     }
+
+    @Override
+    protected void processConnectionEstablished() {
+        driver.handleConnectedEstablished();
+    }
 }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
index 4a1ac4e..6dc4d1a 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestPeer.java
@@ -105,6 +105,8 @@ public abstract class ProtonTestPeer extends ScriptWriter implements AutoCloseab
 
     protected abstract void processDriverOutput(ByteBuffer frame);
 
+    protected abstract void processConnectionEstablished();
+
     protected void checkClosed() {
         if (closed.get()) {
             throw new IllegalStateException("The test peer is closed");
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
index dfdfff0..61e42cc 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/ProtonTestServer.java
@@ -131,6 +131,12 @@ public class ProtonTestServer extends ProtonTestPeer {
             return new SimpleChannelInboundHandler<ByteBuf>() {
 
                 @Override
+                public void channelActive(ChannelHandlerContext ctx) throws Exception {
+                    processConnectionEstablished();
+                    ctx.fireChannelActive();
+                }
+
+                @Override
                 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
                     LOG.trace("AMQP Test Server Channel read: {}", input);
 
@@ -231,6 +237,12 @@ public class ProtonTestServer extends ProtonTestPeer {
         server.write(frame);
     }
 
+    @Override
+    protected void processConnectionEstablished() {
+        LOG.trace("AMQP Server has a client connected.");
+        driver.handleConnectedEstablished();
+    }
+
     protected void processDriverAssertion(AssertionError error) {
         LOG.trace("AMQP Server Closing due to error: {}", error.getMessage());
         close();

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org