You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/21 17:08:30 UTC

svn commit: r1619428 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Thu Aug 21 15:08:30 2014
New Revision: 1619428

URL: http://svn.apache.org/r1619428
Log:
QPIDJMS-27: update message sending to use events to signal completion

Removed:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SentMessageTokenImpl.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Thu Aug 21 15:08:30 2014
@@ -31,6 +31,7 @@ import org.apache.qpid.jms.engine.temp.A
 import org.apache.qpid.jms.engine.temp.EventHandler;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Connection;
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Receiver;
@@ -394,5 +395,14 @@ public class AmqpConnection extends Amqp
                 _pendingCloseLinks.remove(link);//TODO: delete pending close links?
             }
         }
+
+        // == Delivery ==
+
+        @Override
+        public void onDelivery(Delivery delivery)
+        {
+            AmqpLink link = (AmqpLink) delivery.getLink().getContext();
+            link.processDeliveryUpdate(delivery);
+        }
     }
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Thu Aug 21 15:08:30 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.engine;
 
+import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Link;
 
 public abstract class AmqpLink extends AmqpResource
@@ -73,4 +74,6 @@ public abstract class AmqpLink extends A
         _amqpConnection.addPendingCloseLink(_protonLink);
         _protonLink.close();
     }
+
+    abstract void processDeliveryUpdate(Delivery delivery);
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Thu Aug 21 15:08:30 2014
@@ -91,6 +91,12 @@ public class AmqpReceiver extends AmqpLi
     }
 
     @Override
+    void processDeliveryUpdate(Delivery delivery)
+    {
+        //TODO: implement receiver delivery update event processing
+    }
+
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder();

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java Thu Aug 21 15:08:30 2014
@@ -38,7 +38,7 @@ public class AmqpSender extends AmqpLink
         _protonSender = protonSender;
     }
 
-    public AmqpSentMessageToken sendMessage(AmqpMessage amqpMessage)
+    public AmqpSentMessageToken sendMessage(AmqpMessage amqpMessage, AmqpResourceRequest<?> request)
     {
         synchronized (getAmqpConnection())
         {
@@ -67,7 +67,7 @@ public class AmqpSender extends AmqpLink
             _protonSender.send(_buffer, 0, encoded);
             _protonSender.advance();
 
-            AmqpSentMessageToken amqpSentMessageToken = new AmqpSentMessageToken(del, this);
+            AmqpSentMessageToken amqpSentMessageToken = new AmqpSentMessageToken(del, this, request);
             del.setContext(amqpSentMessageToken);
 
             return amqpSentMessageToken;
@@ -75,6 +75,14 @@ public class AmqpSender extends AmqpLink
     }
 
     @Override
+    void processDeliveryUpdate(Delivery delivery)
+    {
+        AmqpSentMessageToken token = (AmqpSentMessageToken) delivery.getContext();
+
+        token.processDeliveryUpdate();
+    }
+
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder();

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSentMessageToken.java Thu Aug 21 15:08:30 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.engine;
 
+import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.engine.Delivery;
 
@@ -28,20 +29,14 @@ public class AmqpSentMessageToken
     private Delivery _delivery;
     private AmqpSender _amqpSender;
     private AmqpConnection _amqpConnection;
+    private AmqpResourceRequest<?> _request;
 
-    public AmqpSentMessageToken(Delivery delivery, AmqpSender sender)
+    public AmqpSentMessageToken(Delivery delivery, AmqpSender sender, AmqpResourceRequest<?> request)
     {
         _delivery = delivery;
         _amqpSender = sender;
         _amqpConnection = _amqpSender.getAmqpConnection();
-    }
-
-    public DeliveryState getRemoteDeliveryState()
-    {
-        synchronized (_amqpConnection)
-        {
-            return _delivery.getRemoteState();
-        }
+        _request = request;
     }
 
     public void settle()
@@ -60,4 +55,21 @@ public class AmqpSentMessageToken
             .append("]");
         return builder.toString();
     }
+
+    void processDeliveryUpdate()
+    {
+        DeliveryState remoteDeliveryState = _delivery.getRemoteState();
+        if(Accepted.getInstance().equals(remoteDeliveryState))
+        {
+            if(_request != null)
+            {
+                _request.onSuccess(null);
+                _request = null;
+            }
+        }
+        //TODO: check for transactional state acceptance
+
+        //TODO: exception if it isn't accepted.
+    }
+
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Thu Aug 21 15:08:30 2014
@@ -31,6 +31,7 @@ import javax.jms.Message;
 import javax.jms.MessageProducer;
 
 import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
 import org.apache.qpid.jms.engine.AmqpSender;
 import org.apache.qpid.jms.engine.AmqpSentMessageToken;
 
@@ -153,13 +154,22 @@ public class SenderImpl extends LinkImpl
                 }
             }
 
-            AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
+            //TODO: we should probably only sync-send for persistent messages, when not otherwise request to send everything sync or async.
+            AmqpResourceRequest<Void> request = new AmqpResourceRequest<Void>();
 
-            getConnectionImpl().stateChanged();
+            AmqpSentMessageToken sentMessage = null;
+            synchronized (getConnectionImpl().getAmqpConnection())
+            {
+                sentMessage = _amqpSender.sendMessage(amqpMessage, request);
+
+                getConnectionImpl().stateChanged();
+            }
+
+            getConnectionImpl().waitForResult(request, "Exception while sending message");
 
-            SentMessageTokenImpl sentMessageImpl = new SentMessageTokenImpl(sentMessage, this);
-            sentMessageImpl.waitUntilAccepted();
+            //TODO: we might not want to settle immediately, or at all, in certain cases. Should perhaps do this during result event processing.
             sentMessage.settle();
+            //TODO: should we flush the transport again now for the settle? Might not need to if done during result event processing.
         }
         finally
         {

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1619428&r1=1619427&r2=1619428&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Thu Aug 21 15:08:30 2014
@@ -27,16 +27,19 @@ import static org.junit.Assert.assertSam
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertFalse;
 
+import java.io.IOException;
+
 import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.Queue;
 
 import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpResourceRequest;
 import org.apache.qpid.jms.engine.AmqpSender;
 import org.apache.qpid.jms.engine.AmqpSentMessageToken;
 import org.apache.qpid.jms.engine.TestAmqpMessage;
-import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -45,6 +48,7 @@ public class SenderImplTest extends Qpid
 {
     private static final String JMS_AMQP_TTL = "JMS_AMQP_TTL";
     private ConnectionImpl _mockConnection;
+    private AmqpConnection _mockAmqpConnection;
     private AmqpSender _mockAmqpSender;
     private SessionImpl _mockSession;
     private Queue _mockQueue;
@@ -56,6 +60,8 @@ public class SenderImplTest extends Qpid
     {
         super.setUp();
         _mockConnection = Mockito.mock(ConnectionImpl.class);
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
+        Mockito.when(_mockConnection.getAmqpConnection()).thenReturn(_mockAmqpConnection);
         _mockAmqpSender = Mockito.mock(AmqpSender.class);
         _mockSession = Mockito.mock(SessionImpl.class);
         Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
@@ -69,11 +75,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderOverriddesMessageDeliveryMode() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -91,11 +93,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderSetsJMSDestinationOnMessage() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -111,11 +109,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderSetsJMSTimestampOnMessage() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -133,11 +127,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderSetsJMSMessageIDOnMessage() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -155,11 +145,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderSetsJMSPriorityOnMessage() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -177,11 +163,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderSetsAbsoluteExpiryAndTtlFieldsOnUnderlyingMessage() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -213,11 +195,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderSetsTtlOnUnderlyingAmqpMessage() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -236,11 +214,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderClearsExistingJMSExpirationAndTtlFieldOnUnderlyingAmqpMessageWhenNotUsingTtl() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -268,11 +242,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderUsesJMS_AMQP_TTLPropertyToSetUnderlyingTtlFieldWhenNoProducerTTLInEffect() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -294,11 +264,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderUsesJMS_AMQP_TTLPropertyToSetUnderlyingTtlFieldWhenProducerTTLInEffect() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -323,11 +289,7 @@ public class SenderImplTest extends Qpid
     @Test
     public void testSenderUsesJMS_AMQP_TTLPropertyValueZeroToClearUnderlyingTtlField() throws Exception
     {
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -349,7 +311,6 @@ public class SenderImplTest extends Qpid
         assertNull(testMessage.getUnderlyingAmqpMessage(false).getTtl());
     }
 
-    //TODO: delete this marker comment and finish test
     /**
      * Test that the producer sets the JMSXUserID property with the
      * user name for the connection the message is being sent on.
@@ -378,12 +339,7 @@ public class SenderImplTest extends Qpid
             setTestSystemProperty(ClientProperties.QPID_SET_JMSXUSERID_ON_SEND, "false");
         }
 
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -421,12 +377,7 @@ public class SenderImplTest extends Qpid
         //disable setting the user-id on send
         setTestSystemProperty(ClientProperties.QPID_SET_JMSXUSERID_ON_SEND, "false");
 
-        //Create mock sent message token, ensure that it is immediately marked as Accepted
-        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
-        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
-        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
-
-        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+        basePreparationForMockSending();
 
         SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
 
@@ -445,4 +396,15 @@ public class SenderImplTest extends Qpid
         assertNull("expected JMSXUserID be null, but was: " + value, value);
         assertFalse("JMSXUserID property should not exist", testMessage.propertyExists(ClientProperties.JMSXUSERID));
     }
+
+    private void basePreparationForMockSending() throws JmsTimeoutException, JmsInterruptedException, IOException
+    {
+        //Create mock request, ensuring that it immediately returns as completed
+        AmqpResourceRequest<?> mockRequest = Mockito.mock(AmqpResourceRequest.class);
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class), Mockito.any(AmqpResourceRequest.class))).thenReturn(_mockToken);
+        Mockito.when(mockRequest.getResult()).thenReturn(null);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org