You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/21 19:57:56 UTC

[4/5] qpid-jms git commit: QPIDJMS-95: if a [transport] failure is reported while we are trying to open then fail the attempt to open the new connection

QPIDJMS-95: if a [transport] failure is reported while we are trying to open then fail the attempt to open the new connection


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9aabc27a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9aabc27a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9aabc27a

Branch: refs/heads/master
Commit: 9aabc27a4de515377cd149c41b6983435a241e3d
Parents: e46354b
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Aug 21 17:29:13 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Aug 21 18:45:13 2015 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  18 ++-
 .../failover/FailoverIntegrationTest.java       | 122 +++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  11 ++
 .../jms/test/testpeer/TestAmqpPeerRunner.java   |  12 ++
 4 files changed, 159 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3a2e709..7383352 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -126,6 +126,7 @@ public class AmqpProvider implements Provider, TransportListener {
     private final Transport protonTransport = Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
 
+    private AsyncResult connectionOpenRequest;
     private ScheduledFuture<?> nextIdleTimeoutCheck;
 
     /**
@@ -240,7 +241,6 @@ public class AmqpProvider implements Provider, TransportListener {
                 try {
                     checkClosed();
                     resource.visit(new JmsResourceVistor() {
-
                         @Override
                         public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
                             AmqpSession session = connection.createSession(sessionInfo);
@@ -296,8 +296,7 @@ public class AmqpProvider implements Provider, TransportListener {
                             }
 
                             connection = new AmqpConnection(AmqpProvider.this, protonConnection, authenticator, connectionInfo);
-                            connection.open(new AsyncResult() {
-
+                            AsyncResult wrappedOpenRequest = new AsyncResult() {
                                 @Override
                                 public void onSuccess() {
                                     fireConnectionEstablished();
@@ -313,7 +312,11 @@ public class AmqpProvider implements Provider, TransportListener {
                                 public boolean isComplete() {
                                     return request.isComplete();
                                 }
-                            });
+                            };
+
+                            connectionOpenRequest = wrappedOpenRequest;
+
+                            connection.open(wrappedOpenRequest);
                         }
 
                         @Override
@@ -838,6 +841,8 @@ public class AmqpProvider implements Provider, TransportListener {
     }
 
     void fireConnectionEstablished() {
+        //The request onSuccess calls this method
+        connectionOpenRequest = null;
 
         long now = System.currentTimeMillis();
         long deadline = protonTransport.tick(now);
@@ -854,6 +859,11 @@ public class AmqpProvider implements Provider, TransportListener {
     }
 
     void fireProviderException(Throwable ex) {
+        if(connectionOpenRequest != null) {
+            connectionOpenRequest.onFailure(ex);
+            connectionOpenRequest = null;
+        }
+
         ProviderListener listener = this.listener;
         if (listener != null) {
             listener.onConnectionFailure(IOExceptionSupport.create(ex));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
new file mode 100644
index 0000000..6a086b3
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverIntegrationTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class);
+
+    @Test(timeout = 20000)
+    public void testFailoverHandlesImmediateTransportDropAfterConnect() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer rejectingPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String rejectingURI = createPeerURI(rejectingPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Rejecting peer is at: {}", rejectingURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+            assertEquals("should not yet have connected to final peer", 1L, finalConnected.getCount());
+
+            // Set expectations on rejecting and final peer
+            rejectingPeer.expectSaslHeaderThenDrop();
+
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+
+            // Close the original peer and wait for things to shake out.
+            originalPeer.close();
+
+            rejectingPeer.waitForAllHandlersToComplete(2000);
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            //Shut it down
+            finalPeer.expectClose();
+            connection.close();
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private JmsConnection establishAnonymousConnecton(TestAmqpPeer origPeer, TestAmqpPeer rejectingPeer, TestAmqpPeer finalPeer) throws JMSException {
+        final String remoteURI = "failover:(" + createPeerURI(origPeer) + ","
+                                              + createPeerURI(rejectingPeer) + ","
+                                              + createPeerURI(finalPeer) + ")";
+
+        ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        Connection connection = factory.createConnection();
+
+        return (JmsConnection) connection;
+    }
+
+    private String createPeerURI(TestAmqpPeer peer) {
+        return "amqp://localhost:" + peer.getServerPort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 6f34ea5..62694d7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1562,4 +1562,15 @@ public class TestAmqpPeer implements AutoCloseable
             _firstAssertionError = ae;
         }
     }
+
+    public void expectSaslHeaderThenDrop() {
+        AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() {
+            @Override
+            public void run() {
+                _driverRunnable.exitReadLoopEarly();
+            }
+        };
+
+        addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, exitAfterHeader));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 17ec2f6..854e64c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -48,6 +48,7 @@ class TestAmqpPeerRunner implements Runnable
     private final Object _inputHandlingLock = new Object();
     private final TestFrameParser _testFrameParser;
     private volatile boolean _suppressReadExceptionOnClose;
+    private volatile boolean _exitReadLoopEarly;
 
     private volatile Throwable _throwable;
 
@@ -108,6 +109,13 @@ class TestAmqpPeerRunner implements Runnable
 
                     _testFrameParser.input(networkInputByteBuffer);
                 }
+
+                if(_exitReadLoopEarly)
+                {
+                    LOGGER.trace("Exiting read loop early");
+                    break;
+                }
+
                 LOGGER.trace("Attempting read");
                 attemptingRead = true;
             }
@@ -217,4 +225,8 @@ class TestAmqpPeerRunner implements Runnable
     public boolean isNeedClientCert() {
         return needClientCert;
     }
+
+    public void exitReadLoopEarly() {
+        _exitReadLoopEarly = true;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org