You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/24 16:36:13 UTC
qpid-jms git commit: QPIDJMS-95: some additional testing
Repository: qpid-jms
Updated Branches:
refs/heads/master 0b2ef1281 -> d7f330218
QPIDJMS-95: some additional testing
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d7f33021
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d7f33021
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d7f33021
Branch: refs/heads/master
Commit: d7f3302185b5e3955e5a77e58cbdf39cd305cc4b
Parents: 0b2ef12
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Aug 24 15:23:36 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Aug 24 15:35:43 2015 +0100
----------------------------------------------------------------------
.../failover/FailoverIntegrationTest.java | 140 ++++++++++++++++++-
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 43 +++---
2 files changed, 162 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7f33021/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 6a086b3..1092bc1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -16,22 +16,33 @@
*/
package org.apache.qpid.jms.provider.failover;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,10 +116,131 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
}
}
- private JmsConnection establishAnonymousConnecton(TestAmqpPeer origPeer, TestAmqpPeer rejectingPeer, TestAmqpPeer finalPeer) throws JMSException {
- final String remoteURI = "failover:(" + createPeerURI(origPeer) + ","
- + createPeerURI(rejectingPeer) + ","
- + createPeerURI(finalPeer) + ")";
+ @Test(timeout = 20000)
+ public void testFailoverHandlesTransportDropBeforeDispositionRecieived() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ // Connect to the first peer
+ originalPeer.expectSaslAnonymousConnect();
+ originalPeer.expectBegin();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalURI.equals(remoteURI.toString())) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalURI.equals(remoteURI.toString())) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ // Create session+producer, send a persistent message on auto-ack session for synchronous send
+ originalPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ originalPeer.expectSenderAttach();
+
+ final MessageProducer producer = session.createProducer(queue);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ final Message message = session.createTextMessage();
+
+ final CountDownLatch senderCompleted = new CountDownLatch(1);
+ final AtomicReference<Throwable> problem = new AtomicReference<Throwable>();
+
+ // Have the peer expect the message but NOT send any disposition for it
+ originalPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, true);
+
+ Thread runner = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ producer.send(message);
+ } catch (Throwable t) {
+ problem.set(t);
+ LOG.error("Problem in sending thread", t);
+ }
+ finally {
+ senderCompleted.countDown();
+ }
+ }
+ });
+ runner.start();
+
+ // Wait for the message to have been sent and received by peer
+ originalPeer.waitForAllHandlersToComplete(3000);
+
+ // Set the secondary peer to expect connection restoration, this time send disposition accepting the message
+ finalPeer.expectSaslAnonymousConnect();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+ finalPeer.expectTransfer(messageMatcher, nullValue(), false, true, new Accepted(), true);
+
+ assertEquals("Should not yet have connected to final peer", 1L, finalConnected.getCount());
+ assertEquals("Sender thread should not yet have completed", 1L, senderCompleted.getCount());
+
+ // Close the original peer to provoke reconnect, while send() is still outstanding
+ originalPeer.close();
+
+ assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+ boolean await = senderCompleted.await(5, TimeUnit.SECONDS);
+ Throwable t = problem.get();
+ assertTrue("Sender thread should have completed. Problem: " + t, await);
+
+ //Shut it down
+ finalPeer.expectClose();
+ connection.close();
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
+ if(peers.length == 0) {
+ throw new IllegalArgumentException("No test peers were given, at least 1 required");
+ }
+
+ String remoteURI = "failover:(";
+ boolean first = true;
+ for(TestAmqpPeer peer : peers) {
+ if (!first) {
+ remoteURI += ",";
+ }
+ remoteURI += createPeerURI(peer);
+ first = false;
+ }
+ remoteURI += ")?failover.maxReconnectAttempts=10";
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d7f33021/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 62694d7..7604439 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1284,13 +1284,19 @@ public class TestAmqpPeer implements AutoCloseable
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
{
- expectTransfer(expectedPayloadMatcher, nullValue(), false, new Accepted(), true);
+ expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
}
- //TODO: fix responseState to only admit applicable types.
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
ListDescribedType responseState, boolean responseSettled)
{
+ expectTransfer(expectedPayloadMatcher, stateMatcher, settled, true, responseState, responseSettled);
+ }
+
+ //TODO: fix responseState to only admit applicable types.
+ public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
+ boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled)
+ {
Matcher<Boolean> settledMatcher = null;
if(settled)
{
@@ -1306,23 +1312,26 @@ public class TestAmqpPeer implements AutoCloseable
transferMatcher.withSettled(settledMatcher);
transferMatcher.withState(stateMatcher);
- final DispositionFrame dispositionResponse = new DispositionFrame()
- .setRole(Role.RECEIVER)
- .setSettled(responseSettled)
- .setState(responseState);
+ if(sendResponseDisposition) {
+ final DispositionFrame dispositionResponse = new DispositionFrame()
+ .setRole(Role.RECEIVER)
+ .setSettled(responseSettled)
+ .setState(responseState);
- // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
- final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
- dispositionFrameSender.setValueProvider(new ValueProvider()
- {
- @Override
- public void setValues()
+ // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
+ dispositionFrameSender.setValueProvider(new ValueProvider()
{
- dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
- dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
- }
- });
- transferMatcher.onCompletion(dispositionFrameSender);
+ @Override
+ public void setValues()
+ {
+ dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
+ dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
+ }
+ });
+
+ transferMatcher.onCompletion(dispositionFrameSender);
+ }
addHandler(transferMatcher);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org