You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:03 UTC
[2/7] qpid-jms git commit: QPIDJMS-207 Adds support for Asynchronous
JMS 2.0 sends.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index 2fa2644..d18e95b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -67,7 +67,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
}
@Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+ public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
try {
if (pendingDelivery != null && pendingDelivery.remotelySettled()) {
DeliveryState state = pendingDelivery.getRemoteState();
@@ -105,7 +105,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
}
}
- super.processDeliveryUpdates(provider);
+ super.processDeliveryUpdates(provider, delivery);
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 229e61f..5912f7f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -24,10 +24,12 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo;
import org.apache.qpid.jms.meta.JmsResource;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpEventSink;
+import org.apache.qpid.jms.provider.amqp.AmqpExceptionBuilder;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.provider.amqp.AmqpResource;
import org.apache.qpid.jms.provider.amqp.AmqpResourceParent;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory;
* @param <INFO> The Type of JmsResource used to describe the target resource.
* @param <ENDPOINT> The AMQP Endpoint that the target resource encapsulates.
*/
-public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink {
+public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink, AmqpExceptionBuilder {
private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class);
@@ -97,7 +99,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
return request.isComplete();
}
- }, getRequestTimeout(), new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out"));
+ }, getRequestTimeout(), this);
}
}
@@ -119,7 +121,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
}
@Override
- public void processDeliveryUpdates(AmqpProvider provider) throws IOException {
+ public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
// No implementation needed here for this event.
}
@@ -185,6 +187,11 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
getRequest().onFailure(openError);
}
+ @Override
+ public Exception createException() {
+ return new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out");
+ }
+
//----- Implementation methods used to customize the build process -------//
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index e6e36df..2490b19 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -801,6 +801,24 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
}
@Override
+ public void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) {
+ if (closingConnection.get() || closed.get() || failed.get()) {
+ return;
+ }
+
+ listener.onCompletedMessageSend(envelope);
+ }
+
+ @Override
+ public void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, Throwable cause) {
+ if (closingConnection.get() || closed.get() || failed.get()) {
+ return;
+ }
+
+ listener.onFailedMessageSend(envelope, cause);
+ }
+
+ @Override
public void onConnectionFailure(final IOException ex) {
if (closingConnection.get() || closed.get() || failed.get()) {
return;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index c8914ae..a8fcf2d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.IllegalStateException;
@@ -932,4 +933,112 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout=20000)
+ public void testMessageListenerCallsConnectionCloseThrowsIllegalStateException() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+ connection.start();
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ testPeer.expectDisposition(true, new AcceptedMatcher());
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message m) {
+ try {
+ LOG.debug("Async consumer got Message: {}", m);
+ connection.close();
+ } catch (Exception ex) {
+ asyncError.set(ex);
+ }
+
+ latch.countDown();
+ }
+ });
+
+ boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+ assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+ assertNotNull(asyncError.get());
+ assertTrue(asyncError.get() instanceof IllegalStateException);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout=20000)
+ public void testMessageListenerCallsSessionCloseThrowsIllegalStateException() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+ connection.start();
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ testPeer.expectDisposition(true, new AcceptedMatcher());
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message m) {
+ try {
+ LOG.debug("Async consumer got Message: {}", m);
+ session.close();
+ } catch (Exception ex) {
+ asyncError.set(ex);
+ }
+
+ latch.countDown();
+ }
+ });
+
+ boolean await = latch.await(3000, TimeUnit.MILLISECONDS);
+ assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await);
+
+ assertNotNull(asyncError.get());
+ assertTrue(asyncError.get() instanceof IllegalStateException);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
index 5485857..2ed9f41 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java
@@ -20,8 +20,15 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
@@ -30,8 +37,11 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.qpid.jms.JmsCompletionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.ListDescribedType;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -47,12 +57,16 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transaction.TxnCapability;
import org.hamcrest.Matcher;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test MessageProducers created using various configuration of the presettle options
*/
public class PresettledProducerIntegrationTest extends QpidJmsTestCase {
+ private static final Logger LOG = LoggerFactory.getLogger(PresettledProducerIntegrationTest.class);
+
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
private final Symbol[] serverCapabilities = new Symbol[] { ANONYMOUS_RELAY };
@@ -419,4 +433,221 @@ public class PresettledProducerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ //----- Test the jms.presettleAll with asynchronous completion -----------//
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionPresettleAllSendToTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionPresettleAllSendToQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testsyncCompletionPresettleAllAnonymousSendToTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testsyncCompletionPresettleAllAnonymousSendToQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleAll=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Queue.class);
+ }
+
+ //----- Test the jms.presettleProducers with asynchronous completion -----//
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionPresettleProducersTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionPresettleProducersQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Queue.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionPresettleProducersAnonymousTopic() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Topic.class);
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionPresettleProducersAnonymousQueue() throws Exception {
+ String presettleConfig = "?jms.presettlePolicy.presettleProducers=true";
+ doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Queue.class);
+ }
+
+ //----- Asynchronous Completion test method implementation ---------------//
+
+ private void doTestAsyncCompletionProducerWithPresettleOptions(String uriOptions, boolean transacted, boolean anonymous, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
+ doTestAsyncCompletionProducerWithPresettleOptions(uriOptions, transacted, anonymous, true, senderSettled, transferSettled, destType);
+ }
+
+ private void doTestAsyncCompletionProducerWithPresettleOptions(String uriOptions, boolean transacted, boolean anonymous, boolean relaySupported, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer, uriOptions, relaySupported ? serverCapabilities : null, null);
+ testPeer.expectBegin();
+
+ Session session = null;
+ Binary txnId = null;
+
+ if (transacted) {
+ // Expect the session, with an immediate link to the transaction coordinator
+ // using a target with the expected capabilities only.
+ CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+ txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+ testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+ testPeer.expectDeclare(txnId);
+
+ session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ } else {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ Destination destination = null;
+ if (destType == Queue.class) {
+ destination = session.createQueue("MyQueue");
+ } else if (destType == Topic.class) {
+ destination = session.createTopic("MyTopis");
+ } else if (destType == TemporaryQueue.class) {
+ String dynamicAddress = "myTempQueueAddress";
+ testPeer.expectTempQueueCreationAttach(dynamicAddress);
+ destination = session.createTemporaryQueue();
+ } else if (destType == TemporaryTopic.class) {
+ String dynamicAddress = "myTempTopicAddress";
+ testPeer.expectTempTopicCreationAttach(dynamicAddress);
+ destination = session.createTemporaryTopic();
+ } else {
+ fail("unexpected type");
+ }
+
+ if (senderSettled) {
+ testPeer.expectSettledSenderAttach();
+ } else {
+ testPeer.expectSenderAttach();
+ }
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = null;
+ if (anonymous) {
+ producer = (JmsMessageProducer) session.createProducer(null);
+ } else {
+ producer = (JmsMessageProducer) session.createProducer(destination);
+ }
+
+ // Create and transfer a new message
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ headersMatcher.withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ Matcher<?> stateMatcher = nullValue();
+ if (transacted) {
+ stateMatcher = new TransactionalStateMatcher();
+ ((TransactionalStateMatcher) stateMatcher).withTxnId(equalTo(txnId));
+ ((TransactionalStateMatcher) stateMatcher).withOutcome(nullValue());
+ }
+
+ ListDescribedType responseState = new Accepted();
+ if (transacted) {
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+ }
+
+ if (transferSettled) {
+ testPeer.expectTransfer(messageMatcher, stateMatcher, true, false, responseState, false);
+ } else {
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true);
+ }
+
+ if (anonymous && !relaySupported) {
+ testPeer.expectDetach(true, true, true);
+ }
+
+ Message message = session.createTextMessage();
+
+ if (anonymous) {
+ producer.send(destination, message, listener);
+ } else {
+ producer.send(message, listener);
+ }
+
+ if (transacted) {
+ testPeer.expectDischarge(txnId, true);
+ }
+
+ testPeer.expectClose();
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+ assertEquals(1, listener.successCount);
+ assertEquals(0, listener.errorCount);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ private class TestJmsCompletionListener implements JmsCompletionListener {
+
+ private final CountDownLatch completed;
+
+ public volatile int successCount;
+ public volatile int errorCount;
+
+ public volatile Message message;
+ public volatile Exception exception;
+
+ public TestJmsCompletionListener() {
+ this(1);
+ }
+
+ public TestJmsCompletionListener(int expected) {
+ this.completed = new CountDownLatch(expected);
+ }
+
+ public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
+ return completed.await(timeout, units);
+ }
+
+ @Override
+ public void onCompletion(Message message) {
+ LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+ this.message = message;
+ this.successCount++;
+
+ completed.countDown();
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
+
+ this.message = message;
+ this.exception = exception;
+ this.errorCount++;
+
+ completed.countDown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 6c53398..1e69eb8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -40,7 +40,10 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
@@ -54,9 +57,11 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.qpid.jms.JmsCompletionListener;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.message.foreign.ForeignJmsMessage;
@@ -68,10 +73,13 @@ import org.apache.qpid.jms.test.testpeer.ListDescribedType;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
+import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.Modified;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
+import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
@@ -1000,6 +1008,79 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testRemotelyEndProducerCompletesAsyncSends() throws Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final AtomicBoolean producerClosed = new AtomicBoolean();
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onProducerClosed(MessageProducer producer, Exception exception) {
+ producerClosed.set(true);
+ }
+ });
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end the session afterwards.
+ testPeer.expectSenderAttach();
+
+ Queue queue = session.createQueue("myQueue");
+ // TODO - Can revert to just MessageProducer once JMS 2.0 API is used
+ final JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ Message message = session.createTextMessage("content");
+
+ final int MSG_COUNT = 3;
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+ }
+
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ producer.send(message, listener);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ // Verify the producer gets marked closed
+ assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertEquals(MSG_COUNT, listener.errorCount);
+
+ // Verify the session is now marked closed
+ try {
+ producer.getDeliveryMode();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ String errorMessage = jmsise.getCause().getMessage();
+ assertTrue(errorMessage.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
+ assertTrue(errorMessage.contains(BREAD_CRUMB));
+ }
+
+ assertTrue("Producer closed callback didn't trigger", producerClosed.get());
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ producer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testRemotelyCloseConnectionDuringSyncSend() throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
@@ -1150,6 +1231,50 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testAsyncCompletionGetsTimedOutErrorWhenNoDispostionArrives() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(500);
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+
+ Message message = session.createTextMessage("text");
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+
+ // Expect the producer to attach and grant it some credit, it should send
+ // a transfer which we will not send any response for which should cause the
+ // send operation to time out.
+ testPeer.expectSenderAttach();
+ testPeer.expectTransferButDoNotRespond(messageMatcher);
+ testPeer.expectClose();
+
+ // TODO - Can revert to plain JMS once 2.0 is supported.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+ try {
+ producer.send(message, listener);
+ } catch (Throwable error) {
+ LOG.info("Caught expected error: {}", error.getMessage());
+ fail("Send should not fail for async.");
+ }
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNotNull(listener.exception);
+ assertTrue(listener.exception instanceof JmsSendTimedOutException);
+ assertNotNull(listener.message);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testSyncSendMessageRejected() throws Exception {
doSyncSendMessageNotAcceptedTestImpl(new Rejected());
}
@@ -1709,6 +1834,430 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testAsyncCompletionAfterSendMessageGetDispoation() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+ testPeer.expectClose();
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+ producer.send(message, listener);
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionResetsBytesMessage() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create and transfer a new message
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+ testPeer.expectClose();
+
+ Binary payload = new Binary(new byte[] {1, 2, 3, 4});
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(payload.getArray());
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+
+ producer.send(message, listener);
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof BytesMessage);
+
+ BytesMessage completed = (BytesMessage) listener.message;
+ assertEquals(payload.getLength(), completed.getBodyLength());
+ byte[] data = new byte[payload.getLength()];
+ completed.readBytes(data);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMessageRejected() throws Exception {
+ doAsyncCompletionSendMessageNotAcceptedTestImpl(new Rejected());
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMessageReleased() throws Exception {
+ doAsyncCompletionSendMessageNotAcceptedTestImpl(new Released());
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMessageModifiedDeliveryFailed() throws Exception {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+
+ doAsyncCompletionSendMessageNotAcceptedTestImpl(modified);
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMessageModifiedUndeliverable() throws Exception {
+ Modified modified = new Modified();
+ modified.setUndeliverableHere(true);
+
+ doAsyncCompletionSendMessageNotAcceptedTestImpl(modified);
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSendMessageModifiedDeliveryFailedUndeliverable() throws Exception {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ modified.setUndeliverableHere(true);
+
+ doAsyncCompletionSendMessageNotAcceptedTestImpl(modified);
+ }
+
+ private void doAsyncCompletionSendMessageNotAcceptedTestImpl(ListDescribedType responseState) throws JMSException, InterruptedException, Exception, IOException {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+
+ final CountDownLatch asyncError = new CountDownLatch(1);
+
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ LOG.debug("ExceptionListener got error: {}", exception.getMessage());
+ asyncError.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create a second producer which allows for a safe wait for credit for the
+ // first producer without the need for a sleep. Otherwise the first producer
+ // might not do an actual async send due to not having received credit yet.
+ session.createProducer(queue);
+
+ Message message = session.createTextMessage("content");
+
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, responseState, true);
+
+ assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener();
+ try {
+ producer.send(message, listener);
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNotNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+ testPeer.expectClose();
+
+ listener = new TestJmsCompletionListener();
+ try {
+ producer.send(message, listener);
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSessionCloseThrowsIllegalStateException() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+ testPeer.expectClose();
+
+ final AtomicReference<JMSException> closeError = new AtomicReference<JMSException>(null);
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+ @Override
+ public void onCompletion(Message message) {
+
+ try {
+ session.close();
+ } catch (JMSException jmsEx) {
+ closeError.set(jmsEx);
+ }
+
+ super.onCompletion(message);
+ };
+ };
+
+ producer.send(message, listener);
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+ assertNotNull(closeError.get());
+ assertTrue(closeError.get() instanceof IllegalStateException);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionConnectionCloseThrowsIllegalStateException() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher());
+ testPeer.expectClose();
+
+ final AtomicReference<JMSException> closeError = new AtomicReference<JMSException>(null);
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+ @Override
+ public void onCompletion(Message message) {
+
+ try {
+ connection.close();
+ } catch (JMSException jmsEx) {
+ closeError.set(jmsEx);
+ }
+
+ super.onCompletion(message);
+ };
+ };
+
+ producer.send(message, listener);
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertNotNull(closeError.get());
+ assertTrue(closeError.get() instanceof IllegalStateException);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSessionCommitThrowsIllegalStateException() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ testPeer.expectSenderAttach();
+
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ final AtomicReference<JMSException> commitError = new AtomicReference<JMSException>(null);
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+ @Override
+ public void onCompletion(Message message) {
+
+ try {
+ session.commit();
+ } catch (JMSException jmsEx) {
+ commitError.set(jmsEx);
+ }
+
+ super.onCompletion(message);
+ };
+ };
+
+ producer.send(message, listener);
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertNotNull(commitError.get());
+ assertTrue(commitError.get() instanceof IllegalStateException);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAsyncCompletionSessionRollbackThrowsIllegalStateException() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectCoordinatorAttach();
+
+ // First expect an unsettled 'declare' transfer to the txn coordinator, and
+ // reply with a declared disposition state containing the txnId.
+ Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+ testPeer.expectDeclare(txnId);
+
+ testPeer.expectSenderAttach();
+
+ final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue("myQueue");
+
+ // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added.
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher();
+ stateMatcher.withTxnId(equalTo(txnId));
+ stateMatcher.withOutcome(nullValue());
+ TransactionalState txState = new TransactionalState();
+ txState.setTxnId(txnId);
+ txState.setOutcome(new Accepted());
+
+ testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true);
+ testPeer.expectDischarge(txnId, true);
+ testPeer.expectClose();
+
+ final AtomicReference<JMSException> rollback = new AtomicReference<JMSException>(null);
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new TestJmsCompletionListener() {
+
+ @Override
+ public void onCompletion(Message message) {
+
+ try {
+ session.rollback();
+ } catch (JMSException jmsEx) {
+ rollback.set(jmsEx);
+ }
+
+ super.onCompletion(message);
+ };
+ };
+
+ producer.send(message, listener);
+
+ assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertNotNull(rollback.get());
+ assertTrue(rollback.get() instanceof IllegalStateException);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -1831,4 +2380,47 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ private class TestJmsCompletionListener implements JmsCompletionListener {
+
+ private final CountDownLatch completed;
+
+ public volatile int successCount;
+ public volatile int errorCount;
+
+ public volatile Message message;
+ public volatile Exception exception;
+
+ public TestJmsCompletionListener() {
+ this(1);
+ }
+
+ public TestJmsCompletionListener(int expected) {
+ this.completed = new CountDownLatch(expected);
+ }
+
+ public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
+ return completed.await(timeout, units);
+ }
+
+ @Override
+ public void onCompletion(Message message) {
+ LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+ this.message = message;
+ this.successCount++;
+
+ completed.countDown();
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
+
+ this.message = message;
+ this.exception = exception;
+ this.errorCount++;
+
+ completed.countDown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 7bf35e4..34d60f1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -57,8 +57,10 @@ import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import org.apache.qpid.jms.JmsCompletionListener;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsMessageProducer;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
@@ -1446,7 +1448,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- //DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+ // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
@@ -1460,12 +1462,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
- //Create an anonymous producer
+ // Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
- //Expect a new message sent by the above producer to cause creation of a new
- //sender link to the given destination, then closing the link after the message is sent.
+ // Expect a new message sent by the above producer to cause creation of a new
+ // sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
@@ -1484,7 +1486,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
Message message = session.createMessage();
producer.send(dest, message);
- //Repeat the send and observe another attach->transfer->detach.
+ // Repeat the send and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
@@ -1675,6 +1677,78 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testRemotelyEndSessionWithProducerCompletesAsyncSends() throws Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final AtomicBoolean sessionClosed = new AtomicBoolean();
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onSessionClosed(Session session, Exception exception) {
+ sessionClosed.set(true);
+ }
+ });
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end the session afterwards.
+ testPeer.expectSenderAttach();
+
+ Queue queue = session.createQueue("myQueue");
+ // TODO - Can revert to just MessageProducer once JMS 2.0 API is used
+ final JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue);
+
+ Message message = session.createTextMessage("content");
+
+ final int MSG_COUNT = 3;
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
+ }
+
+ testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+ TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ producer.send(message, listener);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ // Verify the producer gets marked closed
+ assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS));
+ assertEquals(MSG_COUNT, listener.errorCount);
+ assertEquals(0, listener.successCount);
+
+ // Verify the session is now marked closed
+ try {
+ session.getAcknowledgeMode();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ String errorMessage = jmsise.getCause().getMessage();
+ assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+ assertTrue(errorMessage.contains(BREAD_CRUMB));
+ }
+
+ assertTrue("Session closed callback didn't trigger", sessionClosed.get());
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ producer.close();
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
public void testRemotelyEndSessionWithConsumer() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
@@ -1920,4 +1994,34 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
connection.close();
}
}
+
+ private class TestJmsCompletionListener implements JmsCompletionListener {
+
+ private final CountDownLatch completed;
+
+ public volatile int successCount;
+ public volatile int errorCount;
+
+ public TestJmsCompletionListener(int expected) {
+ completed = new CountDownLatch(expected);
+ }
+
+ public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
+ return completed.await(timeout, units);
+ }
+
+ @Override
+ public void onCompletion(Message message) {
+ LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
+ successCount++;
+ completed.countDown();
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
+ errorCount++;
+ completed.countDown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
index 80a6e6a..7117e9f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java
@@ -22,35 +22,63 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import org.apache.qpid.jms.JmsCompletionListener;
+import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionTestSupport;
import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsMessageProducer;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsSession;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.provider.mock.MockRemotePeer;
+import org.apache.qpid.jms.test.Wait;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test basic functionality around JmsConnection
*/
public class JmsMessageProducerTest extends JmsConnectionTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsMessageProducerTest.class);
+
+ private final MyCompletionListener completionListener = new MyCompletionListener();
private JmsSession session;
+ private final MockRemotePeer remotePeer = new MockRemotePeer();
@Override
@Before
public void setUp() throws Exception {
super.setUp();
+ remotePeer.start();
connection = createConnectionToMockProvider();
session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ remotePeer.shutdown();
+ super.tearDown();
+ }
+
@Test(timeout = 10000)
public void testMultipleCloseCallsNoErrors() throws Exception {
MessageProducer producer = session.createProducer(null);
@@ -106,7 +134,7 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
@Test(timeout = 10000)
public void testAnonymousProducerThrowsUOEWhenExplictDestinationNotProvided() throws Exception {
- MessageProducer producer = session.createProducer(null);
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
Message message = Mockito.mock(Message.class);
try {
@@ -117,17 +145,31 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
}
try {
+ producer.send(message, completionListener);
+ fail("Expected exception not thrown");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
+
+ try {
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
+
+ try {
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
+ fail("Expected exception not thrown");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
}
@Test(timeout = 10000)
public void testExplicitProducerThrowsUOEWhenExplictDestinationIsProvided() throws Exception {
JmsDestination dest = new JmsQueue("explicitDestination");
- MessageProducer producer = session.createProducer(new JmsQueue());
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest);
Message message = Mockito.mock(Message.class);
try {
@@ -138,16 +180,30 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
}
try {
+ producer.send(dest, message, completionListener);
+ fail("Expected exception not thrown");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
+
+ try {
producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
+
+ try {
+ producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
+ fail("Expected exception not thrown");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
}
@Test(timeout = 10000)
public void testAnonymousDestinationProducerThrowsIDEWhenNullDestinationIsProvided() throws Exception {
- MessageProducer producer = session.createProducer(null);
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
Message message = Mockito.mock(Message.class);
try {
@@ -158,10 +214,369 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport {
}
try {
+ producer.send(null, message, completionListener);
+ fail("Expected exception not thrown");
+ } catch (InvalidDestinationException ide) {
+ // expected
+ }
+
+ try {
producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception not thrown");
} catch (InvalidDestinationException ide) {
// expected
}
+
+ try {
+ producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
+ fail("Expected exception not thrown");
+ } catch (InvalidDestinationException ide) {
+ // expected
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testAnonymousProducerThrowsIAEWhenNullCompletionListenerProvided() throws Exception {
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
+ JmsDestination dest = new JmsQueue("explicitDestination");
+
+ Message message = Mockito.mock(Message.class);
+
+ try {
+ producer.send(dest, message, null);
+ fail("Expected exception not thrown");
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+
+ try {
+ producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null);
+ fail("Expected exception not thrown");
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testExplicitProducerThrowsIAEWhenNullCompletionListenerIsProvided() throws Exception {
+ JmsDestination dest = new JmsQueue("explicitDestination");
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest);
+
+ Message message = Mockito.mock(Message.class);
+ try {
+ producer.send(message, null);
+ fail("Expected exception not thrown");
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+
+ try {
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null);
+ fail("Expected exception not thrown");
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testInOrderSendAcksCompletionsReturnInOrder() throws Exception {
+ final int MESSAGE_COUNT = 3;
+
+ final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "mock://localhost?mock.delayCompletionCalls=true");
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final Destination destination = new JmsQueue("explicitDestination");
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+ final MyCompletionListener listener = new MyCompletionListener();
+
+ sendMessages(MESSAGE_COUNT, producer, listener);
+
+ assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+ }
+ }));
+
+ remotePoor.completeAllPendingSends(destination);
+
+ assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.getCompletedSends().size() == MESSAGE_COUNT;
+ }
+ }));
+
+ assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
+
+ connection.close();
+ }
+
+ @Test(timeout = 10000)
+ public void testReversedOrderSendAcksCompletionsReturnInOrder() throws Exception {
+ final int MESSAGE_COUNT = 3;
+
+ final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "mock://localhost?mock.delayCompletionCalls=true");
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final Destination destination = new JmsQueue("explicitDestination");
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+ final MyCompletionListener listener = new MyCompletionListener();
+
+ sendMessages(MESSAGE_COUNT, producer, listener);
+
+ assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+ }
+ }));
+
+ List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
+ assertEquals(MESSAGE_COUNT, pending.size());
+ Collections.reverse(pending);
+
+ for (JmsOutboundMessageDispatch envelope : pending) {
+ LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
+ remotePoor.completePendingSend(envelope);
+ }
+
+ assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.getCompletedSends().size() == MESSAGE_COUNT;
+ }
+ }));
+
+ assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
+
+ connection.close();
+ }
+
+ @Test(timeout = 10000)
+ public void testInOrderSendFailuresCompletionsReturnInOrder() throws Exception {
+ final int MESSAGE_COUNT = 3;
+
+ final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "mock://localhost?mock.delayCompletionCalls=true");
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final Destination destination = new JmsQueue("explicitDestination");
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+ final MyCompletionListener listener = new MyCompletionListener();
+
+ sendMessages(MESSAGE_COUNT, producer, listener);
+ assertTrue("Not all messages sent", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+ }
+ }));
+ remotePoor.failAllPendingSends(destination, new JMSException("Could not send message"));
+
+ assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.getFailedSends().size() == MESSAGE_COUNT;
+ }
+ }));
+
+ assertMessageFailedInOrder(MESSAGE_COUNT, listener);
+
+ connection.close();
+ }
+
+ @Test(timeout = 10000)
+ public void testReversedOrderSendAcksFailuresReturnInOrder() throws Exception {
+ final int MESSAGE_COUNT = 3;
+
+ final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "mock://localhost?mock.delayCompletionCalls=true");
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final Destination destination = new JmsQueue("explicitDestination");
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+ final MyCompletionListener listener = new MyCompletionListener();
+
+ sendMessages(MESSAGE_COUNT, producer, listener);
+
+ assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+ }
+ }));
+
+ List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
+ assertEquals(MESSAGE_COUNT, pending.size());
+ Collections.reverse(pending);
+
+ for (JmsOutboundMessageDispatch envelope : pending) {
+ LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
+ remotePoor.failPendingSend(envelope, new JMSException("Failed to send message"));
+ }
+
+ assertTrue("Not all failures triggered", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.getFailedSends().size() == MESSAGE_COUNT;
+ }
+ }));
+
+ assertMessageFailedInOrder(MESSAGE_COUNT, listener);
+
+ connection.close();
+ }
+
+ @Test(timeout = 10000)
+ public void testInterleavedCompletionsReturnedInOrder() throws Exception {
+ final int MESSAGE_COUNT = 3;
+
+ final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
+
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "mock://localhost?mock.delayCompletionCalls=true");
+
+ Connection connection = factory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ final Destination destination = new JmsQueue("explicitDestination");
+ JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
+ final MyCompletionListener listener = new MyCompletionListener();
+
+ sendMessages(MESSAGE_COUNT, producer, listener);
+
+ assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
+ }
+ }));
+
+ List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
+ assertEquals(MESSAGE_COUNT, pending.size());
+ Collections.reverse(pending);
+
+ for (JmsOutboundMessageDispatch envelope : pending) {
+ int sequence = envelope.getMessage().getIntProperty("sequence");
+ if (sequence % 2 == 0) {
+ LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
+ remotePoor.completePendingSend(envelope);
+ } else {
+ LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
+ remotePoor.failPendingSend(envelope, new JMSException("Failed to send message"));
+ }
+ }
+
+ assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return listener.getCombinedSends().size() == MESSAGE_COUNT;
+ }
+ }));
+
+ assertTotalCompletionOrder(MESSAGE_COUNT, listener);
+
+ connection.close();
+ }
+
+ private void sendMessages(int count, JmsMessageProducer producer, MyCompletionListener listener) throws Exception {
+ for (int i = 0; i < count; ++i) {
+ Message message = session.createMessage();
+ message.setIntProperty("sequence", i);
+
+ producer.send(message, listener);
+ }
+ }
+
+ private void assertMessageCompletedInOrder(int expected, MyCompletionListener listener) throws Exception {
+ assertEquals("Did not get expected number of completions", expected, listener.completed.size());
+ for (int i = 0; i < listener.completed.size(); ++i) {
+ int sequence = listener.completed.get(i).getIntProperty("sequence");
+ assertEquals("Did not complete expected message: " + i + " got: " + sequence, i, sequence);
+ }
+ }
+
+ private void assertMessageFailedInOrder(int expected, MyCompletionListener listener) throws Exception {
+ assertEquals("Did not get expected number of failures", expected, listener.failed.size());
+ for (int i = 0; i < listener.failed.size(); ++i) {
+ int sequence = listener.failed.get(i).getIntProperty("sequence");
+ assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence);
+ }
+ }
+
+ private void assertTotalCompletionOrder(int expected, MyCompletionListener listener) throws Exception {
+ assertEquals("Did not get expected number of failures", expected, listener.combinedResult.size());
+ for (int i = 0; i < listener.combinedResult.size(); ++i) {
+ int sequence = listener.combinedResult.get(i).getIntProperty("sequence");
+ assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence);
+ }
+ }
+
+ private class MyCompletionListener implements JmsCompletionListener {
+
+ private final List<Message> completed = new ArrayList<Message>();
+ private final List<Message> failed = new ArrayList<Message>();
+ private final List<Message> combinedResult = new ArrayList<Message>();
+
+ @Override
+ public void onCompletion(Message message) {
+ try {
+ LOG.debug("Recording completed send: {}", message.getJMSMessageID());
+ } catch (JMSException e) {
+ }
+ completed.add(message);
+ combinedResult.add(message);
+ }
+
+ @Override
+ public void onException(Message message, Exception exception) {
+ try {
+ LOG.debug("Recording failed send: {} -> error {}", message.getJMSMessageID(), exception.getMessage());
+ } catch (JMSException e) {
+ }
+ failed.add(message);
+ combinedResult.add(message);
+ }
+
+ public List<Message> getCombinedSends() {
+ return combinedResult;
+ }
+
+ public List<Message> getCompletedSends() {
+ return completed;
+ }
+
+ public List<Message> getFailedSends() {
+ return failed;
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org