You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/11/22 17:58:40 UTC
qpid-jms git commit: QPIDJMS-226 Hang on failover recovery when
recovering temporary dest
Repository: qpid-jms
Updated Branches:
refs/heads/master 1a08b4409 -> 90f9b1f51
QPIDJMS-226 Hang on failover recovery when recovering temporary dest
Use the in progress provider to recreate the previously created
temporary destinations.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/90f9b1f5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/90f9b1f5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/90f9b1f5
Branch: refs/heads/master
Commit: 90f9b1f5181daeb0e9ad6d9baee679391faad762
Parents: 1a08b44
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Nov 22 12:58:24 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Nov 22 12:58:24 2016 -0500
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 4 +-
.../failover/FailoverIntegrationTest.java | 77 ++++++++++++++++++++
2 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/90f9b1f5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 82bdeff..bfd3c27 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -1151,7 +1151,9 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
request.sync();
for (JmsTemporaryDestination tempDestination : tempDestinations.values()) {
- createResource(tempDestination);
+ request = new ProviderFuture();
+ provider.create(tempDestination, request);
+ request.sync();
}
for (JmsSession session : sessions.values()) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/90f9b1f5/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index a082395..f90da49 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -44,6 +44,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
+import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
@@ -852,6 +853,82 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
}
}
+ @Test(timeout=20000)
+ public void testTempDestinationRecreatedAfterConnectionFailsOver() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ originalPeer.expectSaslAnonymousConnect();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ String dynamicAddress1 = "myTempTopicAddress";
+ originalPeer.expectTempTopicCreationAttach(dynamicAddress1);
+
+ originalPeer.dropAfterLastHandler();
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalURI.equals(remoteURI.toString())) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalURI.equals(remoteURI.toString())) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ // --- Post Failover Expectations of FinalPeer --- //
+
+ finalPeer.expectSaslAnonymousConnect();
+ finalPeer.expectBegin();
+ String dynamicAddress2 = "myTempTopicAddress2";
+ finalPeer.expectTempTopicCreationAttach(dynamicAddress2);
+
+ // Session is recreated after previous temporary destinations are recreated on failover.
+ finalPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic tempTopic = session.createTemporaryTopic();
+
+ assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+ // Delete the temporary Topic and close the session.
+ finalPeer.expectDetach(true, true, true);
+ finalPeer.expectEnd();
+
+ tempTopic.delete();
+
+ session.close();
+
+ // Shut it down
+ finalPeer.expectClose();
+ connection.close();
+
+ originalPeer.waitForAllHandlersToComplete(2000);
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
@Test(timeout = 20000)
public void testFailoverEnforcesRequestTimeoutSession() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org