You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/11/22 17:58:40 UTC

qpid-jms git commit: QPIDJMS-226 Hang on failover recovery when recovering temporary dest

Repository: qpid-jms
Updated Branches:
  refs/heads/master 1a08b4409 -> 90f9b1f51


QPIDJMS-226 Hang on failover recovery when recovering temporary dest

Use the in progress provider to recreate the previously created
temporary destinations.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/90f9b1f5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/90f9b1f5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/90f9b1f5

Branch: refs/heads/master
Commit: 90f9b1f5181daeb0e9ad6d9baee679391faad762
Parents: 1a08b44
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Nov 22 12:58:24 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Nov 22 12:58:24 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  4 +-
 .../failover/FailoverIntegrationTest.java       | 77 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/90f9b1f5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 82bdeff..bfd3c27 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1151,7 +1151,9 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         request.sync();
 
         for (JmsTemporaryDestination tempDestination : tempDestinations.values()) {
-            createResource(tempDestination);
+            request = new ProviderFuture();
+            provider.create(tempDestination, request);
+            request.sync();
         }
 
         for (JmsSession session : sessions.values()) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/90f9b1f5/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index a082395..f90da49 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -44,6 +44,7 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
+import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
@@ -852,6 +853,82 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout=20000)
+    public void testTempDestinationRecreatedAfterConnectionFailsOver() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            String dynamicAddress1 = "myTempTopicAddress";
+            originalPeer.expectTempTopicCreationAttach(dynamicAddress1);
+
+            originalPeer.dropAfterLastHandler();
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            String dynamicAddress2 = "myTempTopicAddress2";
+            finalPeer.expectTempTopicCreationAttach(dynamicAddress2);
+
+            // Session is recreated after previous temporary destinations are recreated on failover.
+            finalPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            TemporaryTopic tempTopic = session.createTemporaryTopic();
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            // Delete the temporary Topic and close the session.
+            finalPeer.expectDetach(true, true, true);
+            finalPeer.expectEnd();
+
+            tempTopic.delete();
+
+            session.close();
+
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
+            originalPeer.waitForAllHandlersToComplete(2000);
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     @Test(timeout = 20000)
     public void testFailoverEnforcesRequestTimeoutSession() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org