You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2022/02/28 22:33:07 UTC
[qpid-protonj2] branch main updated: PROTON-2510 Clear handlers from closed coordinator link
This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push:
new 77e2c9f PROTON-2510 Clear handlers from closed coordinator link
77e2c9f is described below
commit 77e2c9f6445a8a1870aebb4f2dcfddb25de4b694
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Mon Feb 28 17:32:53 2022 -0500
PROTON-2510 Clear handlers from closed coordinator link
If the handlers are not cleared and a detach response is later than the
next txn begin the coodinator can close the new link in error.
---
.../client/impl/ClientLocalTransactionContext.java | 12 ++++
.../protonj2/client/impl/TransactionsTest.java | 66 ++++++++++++++++++++--
2 files changed, 74 insertions(+), 4 deletions(-)
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java
index 4c0d84f..f8933fc 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientLocalTransactionContext.java
@@ -415,6 +415,18 @@ final class ClientLocalTransactionContext implements ClientTransactionContext {
}
private void handleCoordinatorLocalClose(TransactionController controller) {
+ // Disconnect from the controllers event points since we could create a new
+ // controller if a new transaction is requested by the client.
+ controller.declaredHandler(null)
+ .declareFailureHandler(null)
+ .dischargedHandler(null)
+ .dischargeFailureHandler(null)
+ .openHandler(null)
+ .closeHandler(null)
+ .localCloseHandler(null)
+ .parentEndpointClosedHandler(null)
+ .engineShutdownHandler(null);
+
if (currentTxn != null) {
ClientFuture<Session> future = null;
diff --git a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
index 3ad4c97..8d71b0b 100644
--- a/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
+++ b/protonj2-client/src/test/java/org/apache/qpid/protonj2/client/impl/TransactionsTest.java
@@ -63,11 +63,10 @@ import org.apache.qpid.protonj2.types.messaging.Released;
import org.apache.qpid.protonj2.types.transactions.TransactionErrors;
import org.apache.qpid.protonj2.types.transport.AmqpError;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Timeout(30)
+//@Timeout(30)
public class TransactionsTest extends ImperativeClientTestCase {
private static final Logger LOG = LoggerFactory.getLogger(TransactionsTest.class);
@@ -202,7 +201,7 @@ public class TransactionsTest extends ImperativeClientTestCase {
// Expect this to fail since transaction not declared
}
- peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
peer.expectCoordinatorAttach().respond();
peer.remoteFlow().withLinkCredit(2).queue();
peer.expectDeclare().accept();
@@ -215,7 +214,66 @@ public class TransactionsTest extends ImperativeClientTestCase {
session.closeAsync();
connection.closeAsync().get();
- peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testTimedOutExceptionOnBeginWithNoResponseThenRecoverWithNextBeginAndDelayedDetachResponse() throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectCoordinatorAttach().respond();
+ peer.remoteFlow().withLinkCredit(2).queue();
+ peer.expectDeclare();
+ peer.expectDetach().respond().afterDelay(20);
+ peer.start();
+
+ URI remoteURI = peer.getServerURI();
+
+ LOG.info("Test started, peer listening on: {}", remoteURI);
+
+ Client container = Client.create();
+ ConnectionOptions options = new ConnectionOptions().requestTimeout(150);
+ Connection connection = container.connect(remoteURI.getHost(), remoteURI.getPort(), options);
+ Session session = connection.openSession().openFuture().get();
+
+ try {
+ session.beginTransaction();
+ fail("Begin should have timed out after no response.");
+ } catch (ClientTransactionDeclarationException expected) {
+ // Expect this to time out.
+ }
+
+ try {
+ session.commitTransaction();
+ fail("Commit should have failed due to no active transaction.");
+ } catch (ClientIllegalStateException expected) {
+ // Expect this to fail since transaction not declared
+ }
+
+ try {
+ session.rollbackTransaction();
+ fail("Rollback should have failed due to no active transaction.");
+ } catch (ClientIllegalStateException expected) {
+ // Expect this to fail since transaction not declared
+ }
+
+ peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
+ peer.expectCoordinatorAttach().respond();
+ peer.remoteFlow().withLinkCredit(2).queue();
+ peer.expectDeclare().accept();
+ peer.expectDischarge().accept();
+ peer.expectEnd().respond();
+ peer.expectClose().respond();
+
+ session.beginTransaction();
+ session.commitTransaction();
+ session.closeAsync();
+ connection.closeAsync().get();
+
+ peer.waitForScriptToComplete(500, TimeUnit.SECONDS);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org