You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/18 17:11:23 UTC

svn commit: r530049 - /incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Author: ritchiem
Date: Wed Apr 18 08:11:22 2007
New Revision: 530049

URL: http://svn.apache.org/viewvc?view=rev&rev=530049
Log:
QPID-455 Prefetched messages can cause problems with client tools.
Removed the changes as this was causing problems. Guarded with a check for now but solution is till not correct.

Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=530049&r1=530048&r2=530049
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Apr 18 08:11:22 2007
@@ -1932,6 +1932,24 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
+        if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+        {
+//            if (!connectionStopped)
+            {
+                if (isSuspended() && _firstDispatcher.getAndSet(false))
+                {
+                    try
+                    {
+                        suspendChannel(false);
+                    }
+                    catch (AMQException e)
+                    {
+                        _logger.info("Suspending channel threw an exception:" + e);
+                    }
+                }
+            }
+        }
+
         startDistpatcherIfNecessary(false);
     }
 
@@ -1948,24 +1966,6 @@
         {
             _dispatcher.setConnectionStopped(initiallyStopped);
         }
-
-        if (!AMQSession.this._closed.get()
-            && AMQSession.this._startedAtLeastOnce.get()
-            && _firstDispatcher.getAndSet(false))
-        {
-            if (isSuspended())
-            {
-                try
-                {
-                    suspendChannel(false);
-                }
-                catch (AMQException e)
-                {
-                    _logger.info("Suspending channel threw an exception:" + e);
-                }
-            }
-        }
-
     }
 
     void stop() throws AMQException
@@ -1998,17 +1998,23 @@
 
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
-        if (_dispatcher == null)
+        // The dispatcher will be null if we have just created this session
+        // so suspend the channel before we register our consumer so that we don't
+        // start prefetching until a receive/mListener is set.
+        if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
         {
-            if (!isSuspended())
+            if (_dispatcher == null)
             {
-                try
-                {
-                    suspendChannel(true);
-                }
-                catch (AMQException e)
+                if (!isSuspended())
                 {
-                    _logger.info("Suspending channel threw an exception:" + e);
+                    try
+                    {
+                        suspendChannel(true);
+                    }
+                    catch (AMQException e)
+                    {
+                        _logger.info("Suspending channel threw an exception:" + e);
+                    }
                 }
             }
         }