You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/08/28 18:34:28 UTC
svn commit: r1621163 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/impl/
Author: robbie
Date: Thu Aug 28 16:34:28 2014
New Revision: 1621163
URL: http://svn.apache.org/r1621163
Log:
QPIDJMS-26: add basic consumer credit replenishment
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Thu Aug 28 16:34:28 2014
@@ -41,6 +41,14 @@ public class AmqpReceiver extends AmqpLi
_protonReceiver = protonReceiver;
}
+ public int getCredit()
+ {
+ synchronized (getAmqpConnection())
+ {
+ return _protonReceiver.getCredit();
+ }
+ }
+
public void credit(int credit)
{
synchronized (getAmqpConnection())
@@ -58,8 +66,7 @@ public class AmqpReceiver extends AmqpLi
void processDeliveryUpdate(Delivery delivery)
{
//TODO: this is currently processing all messages for the link, should really just do the one given.
- // We can't call recv if the passed delivery is not the 'current', but cant throw the event away either (could be a before-complete disposition change?)
- // Doesnt handle settlement yet.
+ // We can't call recv if the passed delivery is not the 'current'
Delivery currentDelivery = _protonReceiver.current();
if(currentDelivery != null)
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ClientProperties.java Thu Aug 28 16:34:28 2014
@@ -48,4 +48,5 @@ public class ClientProperties
//Client configuration System Property names
public static final String QPID_SET_JMSXUSERID_ON_SEND = "qpid.set-jmsxuserid-on-send";
+ public static final String QPID_DEFAULT_CONSUMER_PREFETCH = "qpid.default-consumer-prefetch";
}
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Thu Aug 28 16:34:28 2014
@@ -34,16 +34,91 @@ public class ReceiverImpl extends LinkIm
{
private final AmqpReceiver _amqpReceiver;
private final SessionImpl _sessionImpl;
- private final Destination _recieverDestination;
+ private final Destination _receiverDestination;
+ private int _prefetchSize = Integer.getInteger(ClientProperties.QPID_DEFAULT_CONSUMER_PREFETCH, 10);
public ReceiverImpl(ConnectionImpl connectionImpl, SessionImpl sessionImpl, AmqpReceiver amqpReceiver, Destination recieverDestination)
{
super(connectionImpl, amqpReceiver);
_sessionImpl = sessionImpl;
_amqpReceiver = amqpReceiver;
- _recieverDestination = recieverDestination;
+ _receiverDestination = recieverDestination;
}
+ public int getPrefetchSize()
+ {
+ return _prefetchSize;
+ }
+
+ public void setPrefetchSize(int prefetchSize)
+ {
+ _prefetchSize = prefetchSize;
+ }
+
+ void credit(int credit)
+ {
+ getConnectionImpl().lock();
+ try
+ {
+ _amqpReceiver.credit(credit);
+ getConnectionImpl().stateChanged();
+ }
+ finally
+ {
+ getConnectionImpl().releaseLock();
+ }
+ }
+
+ boolean flowIfNecessary()
+ {
+ //TODO: do the different session types have any bearing here?
+ if(_prefetchSize == 0)
+ {
+ return false;
+ }
+
+ synchronized (getConnectionImpl().getAmqpConnection())
+ {
+ int credit = _amqpReceiver.getCredit();
+ if (credit <= _prefetchSize / 2)
+ {
+ int topUp = _prefetchSize - credit;
+ _amqpReceiver.credit(topUp);
+
+ return true;
+ }
+
+ return false;
+ }
+ }
+
+ private final class MessageReceivedPredicate extends SimplePredicate
+ {
+ AmqpMessage _message;
+
+ public MessageReceivedPredicate()
+ {
+ super("Message received", _amqpReceiver);
+ }
+
+ @Override
+ public boolean test()
+ {
+ if(_message == null)
+ {
+ _message = _amqpReceiver.receiveNoWait();
+ }
+ return _message != null;
+ }
+
+ public AmqpMessage getReceivedMessage()
+ {
+ return _message;
+ }
+ }
+
+ //======= JMS Methods =======
+
@Override
public Message receive() throws JMSException
{
@@ -67,7 +142,7 @@ public class ReceiverImpl extends LinkIm
AmqpMessage receivedAmqpMessage = messageReceievedCondition.getReceivedMessage();
//TODO: don't create a new factory for every message
- Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl(), _recieverDestination);
+ Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl(), _receiverDestination);
//TODO: accepting/settling will be acknowledge-mode dependent
if(_sessionImpl.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)
@@ -79,6 +154,8 @@ public class ReceiverImpl extends LinkIm
throw new UnsupportedOperationException("Only Auto-Ack currently supported");
}
+ flowIfNecessary();
+
getConnectionImpl().stateChanged();
return receivedMessage;
@@ -94,45 +171,6 @@ public class ReceiverImpl extends LinkIm
}
}
- public void credit(int credit)
- {
- getConnectionImpl().lock();
- try
- {
- _amqpReceiver.credit(credit);
- getConnectionImpl().stateChanged();
- }
- finally
- {
- getConnectionImpl().releaseLock();
- }
- }
-
- private final class MessageReceivedPredicate extends SimplePredicate
- {
- AmqpMessage _message;
-
- public MessageReceivedPredicate()
- {
- super("Message received", _amqpReceiver);
- }
-
- @Override
- public boolean test()
- {
- if(_message == null)
- {
- _message = _amqpReceiver.receiveNoWait();
- }
- return _message != null;
- }
-
- public AmqpMessage getReceivedMessage()
- {
- return _message;
- }
- }
-
@Override
public String getMessageSelector() throws JMSException
{
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Thu Aug 28 16:34:28 2014
@@ -49,8 +49,6 @@ import org.apache.qpid.jms.engine.AmqpSe
public class SessionImpl implements Session
{
- private static final int INITIAL_RECEIVER_CREDIT = 1;
-
private final int _acknowledgeMode;
private final AmqpSession _amqpSession;
private final ConnectionImpl _connectionImpl;
@@ -116,11 +114,13 @@ public class SessionImpl implements Sess
_connectionImpl.waitForResult(request, "Exception while creating sender to: " + address);
+ //TODO: per-consumer prefetch override?
+
if(_connectionImpl.isStarted())
{
- //Issue initial flow for the consumer.
+ //Issue initial flow for the consumer, if required.
//TODO: decide on prefetch behaviour, i.e. whether we defer flow or do it now, and what value to use.
- amqpReceiver.credit(INITIAL_RECEIVER_CREDIT);
+ receiver.flowIfNecessary();
_connectionImpl.stateChanged();
}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java?rev=1621163&r1=1621162&r2=1621163&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java Thu Aug 28 16:34:28 2014
@@ -29,6 +29,7 @@ import javax.jms.Message;
import javax.jms.Session;
import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpConnection;
import org.apache.qpid.jms.engine.AmqpGenericMessage;
import org.apache.qpid.jms.engine.AmqpMessage;
import org.apache.qpid.jms.engine.AmqpReceiver;
@@ -39,6 +40,7 @@ import org.mockito.Mockito;
public class ReceiverImplTest extends QpidJmsTestCase
{
private ConnectionImpl _mockConnection;
+ private AmqpConnection _mockAmqpConnection;
private AmqpReceiver _mockAmqpReceiver;
private SessionImpl _mockSession;
private AmqpMessage _mockAmqpMessage;
@@ -49,6 +51,7 @@ public class ReceiverImplTest extends Qp
{
super.setUp();
_mockConnection = Mockito.mock(ConnectionImpl.class);
+ _mockAmqpConnection = Mockito.mock(AmqpConnection.class);
_mockAmqpReceiver = Mockito.mock(AmqpReceiver.class);
_mockSession = Mockito.mock(SessionImpl.class);
Mockito.when(_mockSession.getDestinationHelper()).thenReturn(new DestinationHelper());
@@ -74,6 +77,7 @@ public class ReceiverImplTest extends Qp
Mockito.when(_mockConnection.isStarted()).thenReturn(true);
Mockito.when(_mockAmqpReceiver.receiveNoWait()).thenReturn(_mockAmqpMessage);
Mockito.when(_mockSession.getConnectionImpl()).thenReturn(_mockConnection);
+ Mockito.when(_mockConnection.getAmqpConnection()).thenReturn(_mockAmqpConnection);
Mockito.when(_mockSession.getAcknowledgeMode()).thenReturn(Session.AUTO_ACKNOWLEDGE);
ImmediateWaitUntil.mockWaitUntil(_mockConnection);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org