You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2021/10/18 12:58:22 UTC

[qpid-jms] branch main updated: QPIDJMS-549: use retained ref that isnt nulled during failover, and avoid multiple signals to the related code that let it NPE more than once

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f6bde0  QPIDJMS-549: use retained ref that isnt nulled during failover, and avoid multiple signals to the related code that let it NPE more than once
2f6bde0 is described below

commit 2f6bde0a388cd1431cacb154ba55a78c22e263af
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Oct 15 17:37:08 2021 +0100

    QPIDJMS-549: use retained ref that isnt nulled during failover, and avoid multiple signals to the related code that let it NPE more than once
---
 .../qpid/jms/provider/amqp/AmqpProvider.java       | 22 ++++--
 .../jms/provider/failover/FailoverProvider.java    |  5 +-
 ...hAmqpOpenProvidedServerListIntegrationTest.java | 88 ++++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       | 15 +++-
 4 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 8058678..d7f2f35 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -448,15 +448,21 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
                         AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
                         connectionRequest = new AsyncResult() {
+                            AtomicBoolean signalled = new AtomicBoolean();
+
                             @Override
                             public void onSuccess() {
-                                fireConnectionEstablished();
-                                request.onSuccess();
+                                if (signalled.compareAndSet(false, true)) {
+                                    fireConnectionEstablished();
+                                    request.onSuccess();
+                                }
                             }
 
                             @Override
                             public void onFailure(ProviderException result) {
-                                request.onFailure(result);
+                                if (signalled.compareAndSet(false, true)) {
+                                    request.onFailure(result);
+                                }
                             }
 
                             @Override
@@ -1470,12 +1476,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     @Override
     public List<URI> getAlternateURIs() {
-        List<URI> alternates = new ArrayList<>();
+        List<URI> alternates = null;
 
         if (connection != null) {
             // If there are failover servers in the open then we signal that to the listeners
             List<AmqpRedirect> failoverList = connection.getProperties().getFailoverServerList();
             if (!failoverList.isEmpty()) {
+                alternates = new ArrayList<>();
+
                 for (AmqpRedirect redirect : failoverList) {
                     try {
                         alternates.add(redirect.toURI());
@@ -1486,7 +1494,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
             }
         }
 
-        return alternates;
+        if (alternates != null) {
+            return alternates;
+        } else {
+            return Collections.emptyList();
+        }
     }
 
     public org.apache.qpid.proton.engine.Transport getProtonTransport() {
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 238e3fc..ce13e7b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -940,8 +940,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
         Provider provider = this.provider;
         if (provider != null) {
             return provider.getAlternateURIs();
+        } else {
+            return Collections.emptyList();
         }
-        return null;
     };
 
     @Override
@@ -1296,7 +1297,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                     LOG.debug("Request received error: {}", result.getMessage());
                     // If we managed to receive an Open frame it might contain
                     // a failover update so process it before handling the error.
-                    processAlternates(provider.getAlternateURIs());
+                    processAlternates(activeProvider.getAlternateURIs());
                     handleProviderFailure(activeProvider, ProviderExceptionSupport.createOrPassthroughFatal(result));
                 }
             } finally {
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
index 01ecbf8..742afcc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -41,6 +42,7 @@ import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
 import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.transports.TransportSupport;
 import org.apache.qpid.jms.util.PropertyUtil;
@@ -393,6 +395,92 @@ public class FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJ
         }
     }
 
+    @Test(timeout = 20000)
+    public void testFailoverHandlesRemoteCloseWithRedirectDuringConnectionSessionEstablishment() throws Exception {
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+             TestAmqpPeer backupPeer = new TestAmqpPeer();) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at {}: Backup peer is at: {}", primaryPeerURI, backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>();
+            redirectInfo.put(NETWORK_HOST, "localhost");
+            redirectInfo.put(PORT, backupPeer.getServerPort());
+
+            // Have the failover-server-list containing a third (non-existent) servers details
+            String thirdPeerTestHost = "test-host";
+            int thirdPeerTestPort = 45678;
+
+            Map<Symbol,Object> thirdPeerTestDetails = new HashMap<>();
+            thirdPeerTestDetails.put(NETWORK_HOST, thirdPeerTestHost);
+            thirdPeerTestDetails.put(PORT, thirdPeerTestPort);
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(thirdPeerTestDetails);
+
+            Map<Symbol,Object> forcingPeerConnectionProperties = new HashMap<Symbol, Object>();
+            forcingPeerConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectSaslAnonymous();
+            primaryPeer.expectOpen(true);
+            // Prepare a bare Open without any failure hint, suggesting success,
+            // but defer writing it until the Close is ready to send too.
+            primaryPeer.sendOpenFrameAfterLastAction(true, forcingPeerConnectionProperties);
+            // Then send a redirecting Close, prompting the Open to actually be written.
+            primaryPeer.remotelyCloseConnection(false, ConnectionError.REDIRECT, "Server is going away", redirectInfo);
+            primaryPeer.expectBegin(false);// From the connection-session, prompted by 'successful' Open.
+            primaryPeer.expectClose(false);
+
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen();
+            backupPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(null, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (primaryPeerURI.equals(remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+
+                    if (backupPeerURI.equals(remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            connection.start();
+
+            primaryPeer.waitForAllHandlersToComplete(3000);
+
+            assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+            assertFalse("Should not connect to primary peer", connectedToPrimary.await(10, TimeUnit.MILLISECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial peer, its advertised third test-details peer, and the peer it then redirected to.
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(primaryPeerURI);
+            afterOpenFailoverURIs.add(new URI("amqp://" + thirdPeerTestHost + ":" + thirdPeerTestPort));
+            afterOpenFailoverURIs.add(backupPeerURI);
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     /*
      * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
      * system properties the redirect uses those properties to connect to the new host.
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 388323b..69581bf 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -928,11 +928,24 @@ public class TestAmqpPeer implements AutoCloseable
     }
 
     public void sendPreemptiveServerOpenFrame() {
+        sendOpenFrameAfterLastAction(false, null);
+    }
+
+    public void sendOpenFrameAfterLastAction(boolean deferWrite, Map<Symbol, Object> serverProperties) {
         // Arrange to send the Open frame after the previous handler
         OpenFrame open = createOpenFrame();
+        if (serverProperties != null) {
+            open.setProperties(serverProperties);
+        }
 
         CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
-        comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null));
+
+        FrameSender openSender = new FrameSender(this, FrameType.AMQP, 0, open, null);
+        if (deferWrite) {
+            openSender.setDeferWrite(true);
+        }
+
+        comp.add(openSender);
     }
 
     public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org