You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/04/18 17:11:23 UTC
svn commit: r530049 -
/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Author: ritchiem
Date: Wed Apr 18 08:11:22 2007
New Revision: 530049
URL: http://svn.apache.org/viewvc?view=rev&rev=530049
Log:
QPID-455 Prefetched messages can cause problems with client tools.
Removed the changes as this was causing problems. Guarded with a check for now but solution is till not correct.
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=530049&r1=530048&r2=530049
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Apr 18 08:11:22 2007
@@ -1932,6 +1932,24 @@
synchronized void startDistpatcherIfNecessary()
{
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
+ {
+// if (!connectionStopped)
+ {
+ if (isSuspended() && _firstDispatcher.getAndSet(false))
+ {
+ try
+ {
+ suspendChannel(false);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
+ }
+ }
+ }
+
startDistpatcherIfNecessary(false);
}
@@ -1948,24 +1966,6 @@
{
_dispatcher.setConnectionStopped(initiallyStopped);
}
-
- if (!AMQSession.this._closed.get()
- && AMQSession.this._startedAtLeastOnce.get()
- && _firstDispatcher.getAndSet(false))
- {
- if (isSuspended())
- {
- try
- {
- suspendChannel(false);
- }
- catch (AMQException e)
- {
- _logger.info("Suspending channel threw an exception:" + e);
- }
- }
- }
-
}
void stop() throws AMQException
@@ -1998,17 +1998,23 @@
bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
- if (_dispatcher == null)
+ // The dispatcher will be null if we have just created this session
+ // so suspend the channel before we register our consumer so that we don't
+ // start prefetching until a receive/mListener is set.
+ if (Boolean.parseBoolean(System.getProperties().getProperty("REGISTER_CONSUMERS_FLOWED", "false")))
{
- if (!isSuspended())
+ if (_dispatcher == null)
{
- try
- {
- suspendChannel(true);
- }
- catch (AMQException e)
+ if (!isSuspended())
{
- _logger.info("Suspending channel threw an exception:" + e);
+ try
+ {
+ suspendChannel(true);
+ }
+ catch (AMQException e)
+ {
+ _logger.info("Suspending channel threw an exception:" + e);
+ }
}
}
}