You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/21 17:08:30 UTC
svn commit: r1619428 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/impl/
Author: robbie
Date: Thu Aug 21 15:08:30 2014
New Revision: 1619428
URL: http://svn.apache.org/r1619428
Log:
QPIDJMS-27: update message sending to use events to signal completion
Removed:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Thu Aug 21 15:08:30 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.jms.engine.temp.A
import org.apache.qpid.jms.engine.temp.EventHandler;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
@@ -394,5 +395,14 @@ public class AmqpConnection extends Amqp
_pendingCloseLinks.remove(link);//TODO: delete pending close links?
}
}
+
+ // == Delivery ==
+
+ @Override
+ public void onDelivery(Delivery delivery)
+ {
+ AmqpLink link = (AmqpLink) delivery.getLink().getContext();
+ link.processDeliveryUpdate(delivery);
+ }
}
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Thu Aug 21 15:08:30 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.engine;
+import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
public abstract class AmqpLink extends AmqpResource
@@ -73,4 +74,6 @@ public abstract class AmqpLink extends A
_amqpConnection.addPendingCloseLink(_protonLink);
_protonLink.close();
}
+
+ abstract void processDeliveryUpdate(Delivery delivery);
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Thu Aug 21 15:08:30 2014
@@ -91,6 +91,12 @@ public class AmqpReceiver extends AmqpLi
}
@Override
+ void processDeliveryUpdate(Delivery delivery)
+ {
+ //TODO: implement receiver delivery update event processing
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java Thu Aug 21 15:08:30 2014
@@ -38,7 +38,7 @@ public class AmqpSender extends AmqpLink
_protonSender = protonSender;
}
- public AmqpSentMessageToken sendMessage(AmqpMessage amqpMessage)
+ public AmqpSentMessageToken sendMessage(AmqpMessage amqpMessage, AmqpResourceRequest<?> request)
{
synchronized (getAmqpConnection())
{
@@ -67,7 +67,7 @@ public class AmqpSender extends AmqpLink
_protonSender.send(_buffer, 0, encoded);
_protonSender.advance();
- AmqpSentMessageToken amqpSentMessageToken = new AmqpSentMessageToken(del, this);
+ AmqpSentMessageToken amqpSentMessageToken = new AmqpSentMessageToken(del, this, request);
del.setContext(amqpSentMessageToken);
return amqpSentMessageToken;
@@ -75,6 +75,14 @@ public class AmqpSender extends AmqpLink
}
@Override
+ void processDeliveryUpdate(Delivery delivery)
+ {
+ AmqpSentMessageToken token = (AmqpSentMessageToken) delivery.getContext();
+
+ token.processDeliveryUpdate();
+ }
+
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder();
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java Thu Aug 21 15:08:30 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.jms.engine;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.engine.Delivery;
@@ -28,20 +29,14 @@ public class AmqpSentMessageToken
private Delivery _delivery;
private AmqpSender _amqpSender;
private AmqpConnection _amqpConnection;
+ private AmqpResourceRequest<?> _request;
- public AmqpSentMessageToken(Delivery delivery, AmqpSender sender)
+ public AmqpSentMessageToken(Delivery delivery, AmqpSender sender, AmqpResourceRequest<?> request)
{
_delivery = delivery;
_amqpSender = sender;
_amqpConnection = _amqpSender.getAmqpConnection();
- }
-
- public DeliveryState getRemoteDeliveryState()
- {
- synchronized (_amqpConnection)
- {
- return _delivery.getRemoteState();
- }
+ _request = request;
}
public void settle()
@@ -60,4 +55,21 @@ public class AmqpSentMessageToken
.append("]");
return builder.toString();
}
+
+ void processDeliveryUpdate()
+ {
+ DeliveryState remoteDeliveryState = _delivery.getRemoteState();
+ if(Accepted.getInstance().equals(remoteDeliveryState))
+ {
+ if(_request != null)
+ {
+ _request.onSuccess(null);
+ _request = null;
+ }
+ }
+ //TODO: check for transactional state acceptance
+
+ //TODO: exception if it isn't accepted.
+ }
+
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Thu Aug 21 15:08:30 2014
@@ -31,6 +31,7 @@ import javax.jms.Message;
import javax.jms.MessageProducer;
import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
import org.apache.qpid.jms.engine.AmqpSender;
import org.apache.qpid.jms.engine.AmqpSentMessageToken;
@@ -153,13 +154,22 @@ public class SenderImpl extends LinkImpl
}
}
- AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
+ //TODO: we should probably only sync-send for persistent messages, when not otherwise request to send everything sync or async.
+ AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
- getConnectionImpl().stateChanged();
+ AmqpSentMessageToken sentMessage = null;
+ synchronized (getConnectionImpl().getAmqpConnection())
+ {
+ sentMessage = _amqpSender.sendMessage(amqpMessage, request);
+
+ getConnectionImpl().stateChanged();
+ }
+
+ getConnectionImpl().waitForResult(request, "Exception while sending message");
- SentMessageTokenImpl sentMessageImpl = new SentMessageTokenImpl(sentMessage, this);
- sentMessageImpl.waitUntilAccepted();
+ //TODO: we might not want to settle immediately, or at all, in certain cases. Should perhaps do this during result event processing.
sentMessage.settle();
+ //TODO: should we flush the transport again now for the settle? Might not need to if done during result event processing.
}
finally
{
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Thu Aug 21 15:08:30 2014
@@ -27,16 +27,19 @@ import static org.junit.Assert.assertSam
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
+import java.io.IOException;
+
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.Queue;
import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
import org.apache.qpid.jms.engine.AmqpSender;
import org.apache.qpid.jms.engine.AmqpSentMessageToken;
import org.apache.qpid.jms.engine.TestAmqpMessage;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -45,6 +48,7 @@ public class SenderImplTest extends Qpid
{
private static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
private ConnectionImpl _mockConnection;
+ private AmqpConnection _mockAmqpConnection;
private AmqpSender _mockAmqpSender;
private SessionImpl _mockSession;
private Queue _mockQueue;
@@ -56,6 +60,8 @@ public class SenderImplTest extends Qpid
{
super.setUp();
_mockConnection = Mockito.mock(ConnectionImpl.class);
+ _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+ Mockito.when(_mockConnection.getAmqpConnection()).thenReturn(_mockAmqpConnection);
_mockAmqpSender = Mockito.mock(AmqpSender.class);
_mockSession = Mockito.mock(SessionImpl.class);
Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
@@ -69,11 +75,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderOverriddesMessageDeliveryMode() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -91,11 +93,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderSetsJMSDestinationOnMessage() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -111,11 +109,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderSetsJMSTimestampOnMessage() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -133,11 +127,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderSetsJMSMessageIDOnMessage() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -155,11 +145,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderSetsJMSPriorityOnMessage() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -177,11 +163,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderSetsAbsoluteExpiryAndTtlFieldsOnUnderlyingMessage() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -213,11 +195,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderSetsTtlOnUnderlyingAmqpMessage() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -236,11 +214,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderClearsExistingJMSExpirationAndTtlFieldOnUnderlyingAmqpMessageWhenNotUsingTtl() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -268,11 +242,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderUsesJMS_AMQP_TTLPropertyToSetUnderlyingTtlFieldWhenNoProducerTTLInEffect() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -294,11 +264,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderUsesJMS_AMQP_TTLPropertyToSetUnderlyingTtlFieldWhenProducerTTLInEffect() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -323,11 +289,7 @@ public class SenderImplTest extends Qpid
@Test
public void testSenderUsesJMS_AMQP_TTLPropertyValueZeroToClearUnderlyingTtlField() throws Exception
{
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -349,7 +311,6 @@ public class SenderImplTest extends Qpid
assertNull(testMessage.getUnderlyingAmqpMessage(false).getTtl());
}
- //TODO: delete this marker comment and finish test
/**
* Test that the producer sets the JMSXUserID property with the
* user name for the connection the message is being sent on.
@@ -378,12 +339,7 @@ public class SenderImplTest extends Qpid
setTestSystemProperty(ClientProperties.QPID_SET_JMSXUSERID_ON_SEND, "false");
}
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -421,12 +377,7 @@ public class SenderImplTest extends Qpid
//disable setting the user-id on send
setTestSystemProperty(ClientProperties.QPID_SET_JMSXUSERID_ON_SEND, "false");
- //Create mock sent message token, ensure that it is immediately marked as Accepted
- AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
- Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
- Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-
- ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ basePreparationForMockSending();
SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
@@ -445,4 +396,15 @@ public class SenderImplTest extends Qpid
assertNull("expected JMSXUserID be null, but was: " + value, value);
assertFalse("JMSXUserID property should not exist", testMessage.propertyExists(ClientProperties.JMSXUSERID));
}
+
+ private void basePreparationForMockSending() throws JmsTimeoutException, JmsInterruptedException, IOException
+ {
+ //Create mock request, ensuring that it immediately returns as completed
+ AmqpResourceRequest<?> mockRequest = Mockito.mock(AmqpResourceRequest.class);
+ AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+ Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class), Mockito.any(AmqpResourceRequest.class))).thenReturn(_mockToken);
+ Mockito.when(mockRequest.getResult()).thenReturn(null);
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org