You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/24 16:36:13 UTC

qpid-jms git commit: QPIDJMS-95: some additional testing

Repository: qpid-jms
Updated Branches:
  refs/heads/master 0b2ef1281 -> d7f330218


QPIDJMS-95: some additional testing


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d7f33021
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d7f33021
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d7f33021

Branch: refs/heads/master
Commit: d7f3302185b5e3955e5a77e58cbdf39cd305cc4b
Parents: 0b2ef12
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Aug 24 15:23:36 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Aug 24 15:35:43 2015 +0100

----------------------------------------------------------------------
 .../failover/FailoverIntegrationTest.java       | 140 ++++++++++++++++++-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  43 +++---
 2 files changed, 162 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7f33021/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 6a086b3..1092bc1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -16,22 +16,33 @@
  */
 package org.apache.qpid.jms.provider.failover;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,10 +116,131 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    private JmsConnection establishAnonymousConnecton(TestAmqpPeer origPeer, TestAmqpPeer rejectingPeer, TestAmqpPeer finalPeer) throws JMSException {
-        final String remoteURI = "failover:(" + createPeerURI(origPeer) + ","
-                                              + createPeerURI(rejectingPeer) + ","
-                                              + createPeerURI(finalPeer) + ")";
+    @Test(timeout = 20000)
+    public void testFailoverHandlesTransportDropBeforeDispositionRecieived() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Connect to the first peer
+            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectBegin();
+
+            final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+            // Create session+producer, send a persistent message on auto-ack session for synchronous send
+            originalPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            originalPeer.expectSenderAttach();
+
+            final MessageProducer producer = session.createProducer(queue);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            final Message message = session.createTextMessage();
+
+            final CountDownLatch senderCompleted = new CountDownLatch(1);
+            final AtomicReference<Throwable> problem = new AtomicReference<Throwable>();
+
+            // Have the peer expect the message but NOT send any disposition for it
+            originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true);
+
+            Thread runner = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        producer.send(message);
+                    } catch (Throwable t) {
+                        problem.set(t);
+                        LOG.error("Problem in sending thread", t);
+                    }
+                    finally {
+                        senderCompleted.countDown();
+                    }
+                }
+            });
+            runner.start();
+
+            // Wait for the message to have been sent and received by peer
+            originalPeer.waitForAllHandlersToComplete(3000);
+
+            // Set the secondary peer to expect connection restoration, this time send disposition accepting the message
+            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true);
+
+            assertEquals("Should not yet have connected to final peer", 1L, finalConnected.getCount());
+            assertEquals("Sender thread should not yet have completed", 1L, senderCompleted.getCount());
+
+            // Close the original peer to provoke reconnect, while send() is still outstanding
+            originalPeer.close();
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+            boolean await = senderCompleted.await(5, TimeUnit.SECONDS);
+            Throwable t = problem.get();
+            assertTrue("Sender thread should have completed. Problem: " + t, await);
+
+            //Shut it down
+            finalPeer.expectClose();
+            connection.close();
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
+        if(peers.length == 0) {
+            throw new IllegalArgumentException("No test peers were given, at least 1 required");
+        }
+
+        String remoteURI = "failover:(";
+        boolean first = true;
+        for(TestAmqpPeer peer : peers) {
+            if (!first) {
+                remoteURI += ",";
+            }
+            remoteURI += createPeerURI(peer);
+            first = false;
+        }
+        remoteURI += ")?failover.maxReconnectAttempts=10";
 
         ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
         Connection connection = factory.createConnection();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7f33021/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 62694d7..7604439 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1284,13 +1284,19 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
     {
-        expectTransfer(expectedPayloadMatcher, nullValue(), false, new Accepted(), true);
+        expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
     }
 
-    //TODO: fix responseState to only admit applicable types.
     public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
                                ListDescribedType responseState, boolean responseSettled)
     {
+        expectTransfer(expectedPayloadMatcher, stateMatcher, settled, true, responseState, responseSettled);
+    }
+
+    //TODO: fix responseState to only admit applicable types.
+    public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
+                               boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled)
+    {
         Matcher<Boolean> settledMatcher = null;
         if(settled)
         {
@@ -1306,23 +1312,26 @@ public class TestAmqpPeer implements AutoCloseable
         transferMatcher.withSettled(settledMatcher);
         transferMatcher.withState(stateMatcher);
 
-        final DispositionFrame dispositionResponse = new DispositionFrame()
-                                                   .setRole(Role.RECEIVER)
-                                                   .setSettled(responseSettled)
-                                                   .setState(responseState);
+        if(sendResponseDisposition) {
+            final DispositionFrame dispositionResponse = new DispositionFrame()
+                                                       .setRole(Role.RECEIVER)
+                                                       .setSettled(responseSettled)
+                                                       .setState(responseState);
 
-        // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
-        final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
-        dispositionFrameSender.setValueProvider(new ValueProvider()
-        {
-            @Override
-            public void setValues()
+            // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
+            dispositionFrameSender.setValueProvider(new ValueProvider()
             {
-                dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
-                dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
-            }
-        });
-        transferMatcher.onCompletion(dispositionFrameSender);
+                @Override
+                public void setValues()
+                {
+                    dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
+                    dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
+                }
+            });
+
+            transferMatcher.onCompletion(dispositionFrameSender);
+        }
 
         addHandler(transferMatcher);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org