You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/28 18:34:28 UTC

svn commit: r1621163 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Thu Aug 28 16:34:28 2014
New Revision: 1621163

URL: http://svn.apache.org/r1621163
Log:
QPIDJMS-26: add basic consumer credit replenishment

Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Thu Aug 28 16:34:28 2014
@@ -41,6 +41,14 @@ public class AmqpReceiver extends AmqpLi
         _protonReceiver = protonReceiver;
     }
 
+    public int getCredit()
+    {
+        synchronized (getAmqpConnection())
+        {
+            return _protonReceiver.getCredit();
+        }
+    }
+
     public void credit(int credit)
     {
         synchronized (getAmqpConnection())
@@ -58,8 +66,7 @@ public class AmqpReceiver extends AmqpLi
     void processDeliveryUpdate(Delivery delivery)
     {
         //TODO: this is currently processing all messages for the link, should really just do the one given.
-        // We can't call recv if the passed delivery is not the 'current', but cant throw the event away either (could be a before-complete disposition change?)
-        // Doesnt handle settlement yet.
+        // We can't call recv if the passed delivery is not the 'current'
 
         Delivery currentDelivery = _protonReceiver.current();
         if(currentDelivery != null)

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java Thu Aug 28 16:34:28 2014
@@ -48,4 +48,5 @@ public class ClientProperties
 
     //Client configuration System Property names
     public static final String QPID_SET_JMSXUSERID_ON_SEND = "qpid.set-jmsxuserid-on-send";
+    public static final String QPID_DEFAULT_CONSUMER_PREFETCH = "qpid.default-consumer-prefetch";
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Thu Aug 28 16:34:28 2014
@@ -34,16 +34,91 @@ public class ReceiverImpl extends LinkIm
 {
     private final AmqpReceiver _amqpReceiver;
     private final SessionImpl _sessionImpl;
-    private final Destination _recieverDestination;
+    private final Destination _receiverDestination;
+    private int _prefetchSize = Integer.getInteger(ClientProperties.QPID_DEFAULT_CONSUMER_PREFETCH, 10);
 
     public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver, Destination recieverDestination)
     {
         super(connectionImpl, amqpReceiver);
         _sessionImpl = sessionImpl;
         _amqpReceiver = amqpReceiver;
-        _recieverDestination = recieverDestination;
+        _receiverDestination = recieverDestination;
     }
 
+    public int getPrefetchSize()
+    {
+        return _prefetchSize;
+    }
+
+    public void setPrefetchSize(int prefetchSize)
+    {
+        _prefetchSize = prefetchSize;
+    }
+
+    void credit(int credit)
+    {
+        getConnectionImpl().lock();
+        try
+        {
+            _amqpReceiver.credit(credit);
+            getConnectionImpl().stateChanged();
+        }
+        finally
+        {
+            getConnectionImpl().releaseLock();
+        }
+    }
+
+    boolean flowIfNecessary()
+    {
+        //TODO: do the different session types have any bearing here?
+        if(_prefetchSize == 0)
+        {
+            return false;
+        }
+
+        synchronized (getConnectionImpl().getAmqpConnection())
+        {
+            int credit = _amqpReceiver.getCredit();
+            if (credit <= _prefetchSize / 2)
+            {
+                int topUp = _prefetchSize - credit;
+                _amqpReceiver.credit(topUp);
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    private final class MessageReceivedPredicate extends SimplePredicate
+    {
+        AmqpMessage _message;
+
+        public MessageReceivedPredicate()
+        {
+            super("Message received", _amqpReceiver);
+        }
+
+        @Override
+        public boolean test()
+        {
+            if(_message == null)
+            {
+                _message = _amqpReceiver.receiveNoWait();
+            }
+            return _message != null;
+        }
+
+        public AmqpMessage getReceivedMessage()
+        {
+            return _message;
+        }
+    }
+
+    //======= JMS Methods =======
+
     @Override
     public Message receive() throws JMSException
     {
@@ -67,7 +142,7 @@ public class ReceiverImpl extends LinkIm
             AmqpMessage receivedAmqpMessage = messageReceievedCondition.getReceivedMessage();
 
             //TODO: don't create a new factory for every message
-            Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl(), _recieverDestination);
+            Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl(), _receiverDestination);
 
             //TODO: accepting/settling will be acknowledge-mode dependent
             if(_sessionImpl.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)
@@ -79,6 +154,8 @@ public class ReceiverImpl extends LinkIm
                 throw new UnsupportedOperationException("Only Auto-Ack currently supported");
             }
 
+            flowIfNecessary();
+
             getConnectionImpl().stateChanged();
 
             return receivedMessage;
@@ -94,45 +171,6 @@ public class ReceiverImpl extends LinkIm
         }
     }
 
-    public void credit(int credit)
-    {
-        getConnectionImpl().lock();
-        try
-        {
-            _amqpReceiver.credit(credit);
-            getConnectionImpl().stateChanged();
-        }
-        finally
-        {
-            getConnectionImpl().releaseLock();
-        }
-    }
-
-    private final class MessageReceivedPredicate extends SimplePredicate
-    {
-        AmqpMessage _message;
-
-        public MessageReceivedPredicate()
-        {
-            super("Message received", _amqpReceiver);
-        }
-
-        @Override
-        public boolean test()
-        {
-            if(_message == null)
-            {
-                _message = _amqpReceiver.receiveNoWait();
-            }
-            return _message != null;
-        }
-
-        public AmqpMessage getReceivedMessage()
-        {
-            return _message;
-        }
-    }
-
     @Override
     public String getMessageSelector() throws JMSException
     {

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Thu Aug 28 16:34:28 2014
@@ -49,8 +49,6 @@ import org.apache.qpid.jms.engine.AmqpSe
 
 public class SessionImpl implements Session
 {
-    private static final int INITIAL_RECEIVER_CREDIT = 1;
-
     private final int _acknowledgeMode;
     private final AmqpSession _amqpSession;
     private final ConnectionImpl _connectionImpl;
@@ -116,11 +114,13 @@ public class SessionImpl implements Sess
 
             _connectionImpl.waitForResult(request, "Exception while creating sender to: " + address);
 
+            //TODO: per-consumer prefetch override?
+
             if(_connectionImpl.isStarted())
             {
-                //Issue initial flow for the consumer.
+                //Issue initial flow for the consumer, if required.
                 //TODO: decide on prefetch behaviour, i.e. whether we defer flow or do it now, and what value to use.
-                amqpReceiver.credit(INITIAL_RECEIVER_CREDIT);
+                receiver.flowIfNecessary();
                 _connectionImpl.stateChanged();
             }
 

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java Thu Aug 28 16:34:28 2014
@@ -29,6 +29,7 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpGenericMessage;
 import org.apache.qpid.jms.engine.AmqpMessage;
 import org.apache.qpid.jms.engine.AmqpReceiver;
@@ -39,6 +40,7 @@ import org.mockito.Mockito;
 public class ReceiverImplTest extends QpidJmsTestCase
 {
     private ConnectionImpl _mockConnection;
+    private AmqpConnection _mockAmqpConnection;
     private AmqpReceiver _mockAmqpReceiver;
     private SessionImpl _mockSession;
     private AmqpMessage _mockAmqpMessage;
@@ -49,6 +51,7 @@ public class ReceiverImplTest extends Qp
     {
         super.setUp();
         _mockConnection = Mockito.mock(ConnectionImpl.class);
+        _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
         _mockAmqpReceiver = Mockito.mock(AmqpReceiver.class);
         _mockSession = Mockito.mock(SessionImpl.class);
         Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
@@ -74,6 +77,7 @@ public class ReceiverImplTest extends Qp
         Mockito.when(_mockConnection.isStarted()).thenReturn(true);
         Mockito.when(_mockAmqpReceiver.receiveNoWait()).thenReturn(_mockAmqpMessage);
         Mockito.when(_mockSession.getConnectionImpl()).thenReturn(_mockConnection);
+        Mockito.when(_mockConnection.getAmqpConnection()).thenReturn(_mockAmqpConnection);
         Mockito.when(_mockSession.getAcknowledgeMode()).thenReturn(Session.AUTO_ACKNOWLEDGE);
 
         ImmediateWaitUntil.mockWaitUntil(_mockConnection);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org