You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2023/04/19 15:21:05 UTC

[qpid-protonj2] branch main updated: PROTON-2713 Ensure connection drops before checking post condition

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new f4176c46 PROTON-2713 Ensure connection drops before checking post condition
f4176c46 is described below

commit f4176c46055fab950eb2e41a8fb6b1f35b8f007a
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed Apr 19 11:10:59 2023 -0400

    PROTON-2713 Ensure connection drops before checking post condition
    
    Need to ensure the peer has dropped the connection before checking the
    failure case.
---
 .../protonj2/client/impl/StreamSenderTest.java     | 33 ++++++++++++++++------
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
index 4577b694..537d6da1 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/StreamSenderTest.java
@@ -2471,7 +2471,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
     }
 
     @Test
-    void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
+    public void testStreamMessageFlushFailsAfterConnectionDropped() throws Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
             peer.expectSASLAnonymousConnect();
             peer.expectOpen().respond();
@@ -2480,12 +2480,15 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.remoteFlow().withLinkCredit(1).queue();
             peer.start();
 
-            URI remoteURI = peer.getServerURI();
+            final URI remoteURI = peer.getServerURI();
+            final CountDownLatch disconnected = new CountDownLatch(1);
 
             LOG.info("Test started, peer listening on: {}", remoteURI);
 
             Client container = Client.create();
-            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            ConnectionOptions connectOptions = new ConnectionOptions();
+            connectOptions.disconnectedHandler((c, e) -> disconnected.countDown());
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectOptions);
             StreamSender sender = connection.openStreamSender("test-queue");
             StreamSenderMessage message = sender.beginMessage();
 
@@ -2515,6 +2518,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             // Next write should fail as connection should have dropped.
             stream.write(new byte[] { 8, 9, 10, 11 });
 
+            assertTrue(disconnected.await(5, TimeUnit.SECONDS));
+
             try {
                 stream.flush();
                 fail("Should not be able to flush after connection drop");
@@ -2529,7 +2534,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
     }
 
     @Test
-    void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws Exception {
+    public void testStreamMessageCloseThatFlushesFailsAfterConnectionDropped() throws Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
             peer.expectSASLAnonymousConnect();
             peer.expectOpen().respond();
@@ -2538,12 +2543,15 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.remoteFlow().withLinkCredit(1).queue();
             peer.start();
 
-            URI remoteURI = peer.getServerURI();
+            final URI remoteURI = peer.getServerURI();
+            final CountDownLatch disconnected = new CountDownLatch(1);
 
             LOG.info("Test started, peer listening on: {}", remoteURI);
 
             Client container = Client.create();
-            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            ConnectionOptions connectOptions = new ConnectionOptions();
+            connectOptions.disconnectedHandler((c, e) -> disconnected.countDown());
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectOptions);
             StreamSender sender = connection.openStreamSender("test-queue");
             StreamSenderMessage message = sender.beginMessage();
 
@@ -2573,6 +2581,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             // Next write should fail as connection should have dropped.
             stream.write(new byte[] { 8, 9, 10, 11 });
 
+            assertTrue(disconnected.await(5, TimeUnit.SECONDS));
+
             try {
                 stream.close();
                 fail("Should not be able to close after connection drop");
@@ -2587,7 +2597,7 @@ public class StreamSenderTest extends ImperativeClientTestCase {
     }
 
     @Test
-    void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws Exception {
+    public void testStreamMessageWriteThatFlushesFailsAfterConnectionDropped() throws Exception {
         try (ProtonTestServer peer = new ProtonTestServer()) {
             peer.expectSASLAnonymousConnect();
             peer.expectOpen().respond();
@@ -2597,12 +2607,15 @@ public class StreamSenderTest extends ImperativeClientTestCase {
             peer.dropAfterLastHandler();
             peer.start();
 
-            URI remoteURI = peer.getServerURI();
+            final URI remoteURI = peer.getServerURI();
+            final CountDownLatch disconnected = new CountDownLatch(1);
 
             LOG.info("Test started, peer listening on: {}", remoteURI);
 
             Client container = Client.create();
-            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort());
+            ConnectionOptions connectOptions = new ConnectionOptions();
+            connectOptions.disconnectedHandler((c, e) -> disconnected.countDown());
+            Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), connectOptions);
             StreamSenderOptions options = new StreamSenderOptions().writeBufferSize(1024);
             StreamSender sender = connection.openStreamSender("test-queue", options);
             StreamSenderMessage message = sender.beginMessage();
@@ -2614,6 +2627,8 @@ public class StreamSenderTest extends ImperativeClientTestCase {
 
             peer.waitForScriptToComplete();
 
+            assertTrue(disconnected.await(5, TimeUnit.SECONDS));
+
             try {
                 stream.write(payload);
                 fail("Should not be able to write section after connection drop");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org