You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/10/14 00:19:03 UTC

qpid-jms git commit: QPIDJMS-125 Remove extra call to TX rollback on session close and add some tests in failover for tx recreate on reconnect. Fix some issues in the failover integration test.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 0fd981ca1 -> 845f75db6


QPIDJMS-125 Remove extra call to TX rollback on session close and add
some tests in failover for tx recreate on reconnect.  Fix some issues in
the failover integration test. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/845f75db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/845f75db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/845f75db

Branch: refs/heads/master
Commit: 845f75db6ca3c2a67274aafff5cfc65b465705c6
Parents: 0fd981c
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 13 18:18:53 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 13 18:18:53 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsSession.java    |   4 -
 .../failover/FailoverIntegrationTest.java       | 130 ++++++++++++++++++-
 2 files changed, 128 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/845f75db/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index eee84b8..c0a361b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -229,10 +229,6 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa
     protected void doClose() throws JMSException {
         boolean interrupted = Thread.interrupted();
         shutdown();
-        try {
-            transactionContext.rollback();
-        } catch (JMSException e) {
-        }
         connection.removeSession(sessionInfo);
         connection.destroyResource(sessionInfo);
         if (interrupted) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/845f75db/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index a52cc79..f5063c9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -54,12 +54,18 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.jms.util.StopWatch;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
@@ -86,6 +92,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
                 fail("Should have thrown JMSSecurityException: " + ex);
             }
 
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -148,7 +156,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
 
-            //Shut it down
+            // Shut it down
             finalPeer.expectClose();
             connection.close();
             finalPeer.waitForAllHandlersToComplete(1000);
@@ -258,7 +266,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             Throwable t = problem.get();
             assertTrue("Sender thread should have completed. Problem: " + t, await);
 
-            //Shut it down
+            // Shut it down
             finalPeer.expectClose();
             connection.close();
             finalPeer.waitForAllHandlersToComplete(1000);
@@ -320,6 +328,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
 
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
             finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -348,6 +360,12 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             String message = "Initial connect should not have delayed for the specified initialReconnectDelay." + "Elapsed=" + taken + ", delay=" + delay;
             assertTrue(message,  taken < delay);
             assertTrue("Connection took longer than reasonable: " + taken, taken < 5000);
+
+            // Shut it down
+            originalPeer.expectClose();
+            connection.close();
+
+            originalPeer.waitForAllHandlersToComplete(2000);
         }
     }
 
@@ -416,6 +434,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             consumer.close();
 
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
             finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -488,6 +510,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Closing consumer");
             consumer.close();
 
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
             finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -562,6 +588,10 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Closing consumer");
             consumer.close();
 
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
             finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
@@ -633,6 +663,12 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             assertFalse(queueView.hasMoreElements());
 
             browser.close();
+
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
@@ -678,10 +714,100 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
                 LOG.info("Test caught expected error: {}", ide.getMessage());
             }
 
+            // Shut it down
+            testPeer.expectClose();
+            connection.close();
+
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
 
+    @Test(timeout=20000)
+    public void testTxRecreatedAfterConnectionFailsOver() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+            TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            originalPeer.expectBegin();
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            originalPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a Declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+            TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
+            declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
+            originalPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of FinalPeer --- //
+
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+            finalPeer.expectTransfer(declareMatcher, nullValue(), false, new Declared().setTxnId(txnId), true);
+
+            // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
+            // and reply with accepted and settled disposition to indicate the rollback succeeded.
+            Discharge discharge = new Discharge();
+            discharge.setFail(true);
+            discharge.setTxnId(txnId);
+            TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
+            dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
+            finalPeer.expectTransfer(dischargeMatcher, nullValue(), false, new Accepted(), true);
+            finalPeer.expectEnd();
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            session.close();
+
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
+            originalPeer.waitForAllHandlersToComplete(2000);
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
         return establishAnonymousConnecton(null, peers);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org