You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2008/02/06 16:14:43 UTC

svn commit: r619012 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQSession_0_10.java BasicMessageConsumer_0_10.java

Author: arnaudsimon
Date: Wed Feb  6 07:14:42 2008
New Revision: 619012

URL: http://svn.apache.org/viewvc?rev=619012&view=rev
Log:
Changed for using Window mode see QPID-778

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=619012&r1=619011&r2=619012&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Feb  6 07:14:42 2008
@@ -375,7 +375,7 @@
                                           consumer.isNoLocal() ? Option.NO_LOCAL : Option.NO_OPTION,
                                           consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
 
-        getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_CREDIT);
+        getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_MODE_WINDOW);
         getQpidSession().messageFlow(consumer.getConsumerTag().toString(), Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
         // We need to sync so that we get notify of an error.
         if(consumer.isStrated())

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=619012&r1=619011&r2=619012&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Wed Feb  6 07:14:42 2008
@@ -46,10 +46,6 @@
 public class BasicMessageConsumer_0_10 extends BasicMessageConsumer<Struct[], ByteBuffer>
         implements org.apache.qpidity.nclient.util.MessageListener
 {
-     /**
-     * Number of received message so far
-     */
-    private final AtomicLong _messagesReceived = new AtomicLong(0);
 
     /**
      * This class logger
@@ -118,7 +114,6 @@
      */
     public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
     {
-        _messagesReceived.incrementAndGet();
         boolean messageOk = false;
         try
         {
@@ -143,20 +138,7 @@
         }
     }
 
-    /**
-     * Require more credit for this consumer
-     */
-    private void requireMoreCreditIfNecessary()
-    {
-        if (_isStarted && _messagesReceived.get() >= AMQSession_0_10.MAX_PREFETCH)
-        {
-            // require more credit
-            _0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
-                    Session.MESSAGE_FLOW_UNIT_MESSAGE,
-                    AMQSession_0_10.MAX_PREFETCH);
-            _messagesReceived.set(0);
-        }
-    }
+
 
     /**
      * This method is invoked by the transport layer when a message is delivered for this
@@ -239,14 +221,6 @@
     {
         // notify the session
         ((AMQSession_0_10) getSession()).addMessageTag(msg.getDeliveryTag());
-        if (isMessageListenerSet())
-        {
-            requireMoreCreditIfNecessary();
-        }
-        else if (_synchronousQueue.isEmpty())
-        {
-            requireMoreCreditIfNecessary();
-        }
         //if (!Boolean.getBoolean("noAck"))
         //{
             super.postDeliver(msg);
@@ -458,7 +432,6 @@
                                                           org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
                                                           0xFFFFFFFF);
                 _0_10session.getQpidSession().sync();
-                _messagesReceived.set(0);
             }
         }
     }