You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/02/06 16:14:43 UTC
svn commit: r619012 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQSession_0_10.java BasicMessageConsumer_0_10.java
Author: arnaudsimon
Date: Wed Feb 6 07:14:42 2008
New Revision: 619012
URL: http://svn.apache.org/viewvc?rev=619012&view=rev
Log:
Changed for using Window mode see QPID-778
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=619012&r1=619011&r2=619012&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Feb 6 07:14:42 2008
@@ -375,7 +375,7 @@
consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
- getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+ getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
if(consumer.isStrated())
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=619012&r1=619011&r2=619012&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Feb 6 07:14:42 2008
@@ -46,10 +46,6 @@
public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
implements org.apache.qpidity.nclient.util.MessageListener
{
- /**
- * Number of received message so far
- */
- private final AtomicLong _messagesReceived = new AtomicLong(0);
/**
* This class logger
@@ -118,7 +114,6 @@
*/
public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
{
- _messagesReceived.incrementAndGet();
boolean messageOk = false;
try
{
@@ -143,20 +138,7 @@
}
}
- /**
- * Require more credit for this consumer
- */
- private void requireMoreCreditIfNecessary()
- {
- if (_isStarted && _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
- {
- // require more credit
- _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
- Session.MESSAGE_FLOW_UNIT_MESSAGE,
- AMQSession_0_10.MAX_PREFETCH);
- _messagesReceived.set(0);
- }
- }
+
/**
* This method is invoked by the transport layer when a message is delivered for this
@@ -239,14 +221,6 @@
{
// notify the session
((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
- if (isMessageListenerSet())
- {
- requireMoreCreditIfNecessary();
- }
- else if (_synchronousQueue.isEmpty())
- {
- requireMoreCreditIfNecessary();
- }
//if (!Boolean.getBoolean("noAck"))
//{
super.postDeliver(msg);
@@ -458,7 +432,6 @@
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
0xFFFFFFFF);
_0_10session.getQpidSession().sync();
- _messagesReceived.set(0);
}
}
}