You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/01 20:09:11 UTC

svn commit: r820739 - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/configuration/ broker/src/main/java/org/apache/qpid/server/logging/messages/ broker/src/main/java/org/apache/qpid/s...

Author: rgodfrey
Date: Thu Oct  1 18:09:10 2009
New Revision: 820739

URL: http://svn.apache.org/viewvc?rev=820739&view=rev
Log:
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
      - copied, changed from r820624, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
    qpid/trunk/qpid/java/test-profiles/010Excludes

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Oct  1 18:09:10 2009
@@ -27,13 +27,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -42,12 +41,8 @@
 import org.apache.qpid.server.flow.FlowCreditManager;
 import org.apache.qpid.server.flow.Pre0_10CreditManager;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -59,6 +54,7 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -119,6 +115,11 @@
     private final AMQProtocolSession _session;
     private boolean _closing;
 
+    private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+    private final AtomicBoolean _blocking = new AtomicBoolean(false);
+
+
     private LogActor _actor;
     private LogSubject _logSubject;
 
@@ -798,6 +799,7 @@
                 _actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
             }
 
+
             // This section takes two different approaches to perform to perform
             // the same function. Ensuring that the Subscription has taken note
             // of the change in Channel State
@@ -978,4 +980,37 @@
     {
         return _actor;
     }
+
+    public void block(AMQQueue queue)
+    {
+        if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+        {
+
+            if(_blocking.compareAndSet(false,true))
+            {
+                _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString()));
+                flow(false);
+            }
+        }
+    }
+
+    public void unblock(AMQQueue queue) 
+    {
+        if(_blockingQueues.remove(queue))
+        {
+            if(_blocking.compareAndSet(true,false))
+            {
+                _actor.message(_logSubject, ChannelMessages.CHN_1006());
+
+                flow(true);
+            }
+        }
+    }
+
+    private void flow(boolean flow)
+    {
+        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
+        _session.writeFrame(responseBody.generateFrame(_channelId));
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Oct  1 18:09:10 2009
@@ -108,4 +108,14 @@
         return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
     }
 
+    public long getCapacity()
+    {
+        return _config.getLong("capacity", _vHostConfig.getCapacity());
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Thu Oct  1 18:09:10 2009
@@ -103,6 +103,8 @@
         envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
         envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
         envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
+        envVarMap.put("QPID_QUEUECAPACITY", "capacity");
+        envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
         envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer");
         envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
         envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
@@ -289,7 +291,6 @@
         return conf;
     }
 
-    @Override
     public void handle(Signal arg0)
     {
         try
@@ -507,6 +508,16 @@
         return getConfig().getLong("minimumAlertRepeatGap", 0);
     }
 
+    public long getCapacity()
+    {
+        return getConfig().getLong("capacity", 0L);
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return getConfig().getLong("flowResumeCapacity", getCapacity());
+    }
+
     public int getProcessors()
     {
         return getConfig().getInt("connector.processors", 4);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Oct  1 18:09:10 2009
@@ -166,4 +166,15 @@
         return _config.getLong("queues.minimumAlertRepeatGap", 0);
     }
 
+
+    public long getCapacity()
+    {
+        return _config.getLong("queues.capacity", 0l);
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return _config.getLong("queues.flowResumeCapacity", getCapacity());
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Thu Oct  1 18:09:10 2009
@@ -249,12 +249,16 @@
 # 0 - bytes allowed in prefetch
 # 1 - number of messagse. 
 CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
 
 #Queue
 # 0 - owner
 # 1 - priority
 QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
 QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
 
 #Exchange
 # 0 - type
@@ -269,4 +273,4 @@
 #Subscription
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
-SUB-1003 = State : {0}
\ No newline at end of file
+SUB-1003 = State : {0}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Oct  1 18:09:10 2009
@@ -160,6 +160,17 @@
     void setMinimumAlertRepeatGap(long value);
 
 
+    long getCapacity();
+
+    void setCapacity(long capacity);
+
+
+    long getFlowResumeCapacity();
+
+    void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
     void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
 
     long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@
 
     void stop();
 
+    void checkCapacity(AMQChannel channel);
+
     /**
      * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
      * already exists.

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Oct  1 18:09:10 2009
@@ -26,11 +26,105 @@
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
+import java.util.Map;
+import java.util.HashMap;
+
 
 public class AMQQueueFactory
 {
     public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
 
+    private abstract static class QueueProperty
+    {
+
+        private final AMQShortString _argumentName;
+
+
+        public QueueProperty(String argumentName)
+        {
+            _argumentName = new AMQShortString(argumentName);
+        }
+
+        public AMQShortString getArgumentName()
+        {
+            return _argumentName;
+        }
+
+
+        public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+    }
+
+    private abstract static class QueueLongProperty extends QueueProperty
+    {
+
+        public QueueLongProperty(String argumentName)
+        {
+            super(argumentName);
+        }
+
+        public void setPropertyValue(AMQQueue queue, Object value)
+        {
+            if(value instanceof Number)
+            {
+                setPropertyValue(queue, ((Number)value).longValue());
+            }
+
+        }
+
+        abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+    }
+
+    private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+            new QueueLongProperty("x-qpid-maximum-message-age")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumMessageAge(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-maximum-message-size")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumMessageSize(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-maximum-message-count")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMaximumMessageCount(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setMinimumAlertRepeatGap(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-capacity")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setCapacity(value);
+                }
+            },
+            new QueueLongProperty("x-qpid-flow-resume-capacity")
+            {
+                public void setPropertyValue(AMQQueue queue, long value)
+                {
+                    queue.setFlowResumeCapacity(value);
+                }
+            }
+
+    };
+
+
+
     public static AMQQueue createAMQQueueImpl(AMQShortString name,
                                               boolean durable,
                                               AMQShortString owner,
@@ -53,6 +147,18 @@
         //Register the new queue
         virtualHost.getQueueRegistry().registerQueue(q);
         q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+        if(arguments != null)
+        {
+            for(QueueProperty p : DECLAREABLE_PROPERTIES)
+            {
+                if(arguments.containsKey(p.getArgumentName()))
+                {
+                    p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+                }
+            }
+        }
+
         return q;
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Oct  1 18:09:10 2009
@@ -42,8 +42,7 @@
 
     private final SimpleQueueEntryList _queueEntryList;
 
-    private AMQMessage _message;
-
+    private final AMQMessage _message;
 
     private Set<Subscription> _rejectedBy = null;
 
@@ -191,7 +190,7 @@
 
     public boolean immediateAndNotDelivered() 
     {
-        return _message.immediateAndNotDelivered();
+        return getMessage().immediateAndNotDelivered();
     }
 
     public void setRedelivered(boolean b)
@@ -393,4 +392,5 @@
     {
         return _queueEntryList;
     }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Oct  1 18:09:10 2009
@@ -1,11 +1,10 @@
 package org.apache.qpid.server.queue;
 
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
 
 /*
 *
@@ -96,6 +96,8 @@
     private final Executor _asyncDelivery;
     private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
+    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
     /** max allowed size(KB) of a single message */
     public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
 
@@ -122,6 +124,10 @@
     private LogSubject _logSubject;
     private LogActor _logActor;
 
+
+    private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+    private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
     {
@@ -629,6 +635,8 @@
             throw new FailedDequeueException(_name.toString(), e);
         }
 
+        checkCapacity();
+
     }
 
     private void decrementQueueSize(final QueueEntry entry)
@@ -1173,6 +1181,58 @@
         }
     }
 
+    public void checkCapacity(AMQChannel channel)
+    {
+        if(_capacity != 0l)
+        {
+            if(_atomicQueueSize.get() > _capacity)
+            {
+                //Overfull log message
+                _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+                if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+                {
+                    channel.block(this);
+                }
+
+                if(_atomicQueueSize.get() <= _flowResumeCapacity)
+                {
+
+                    //Underfull log message
+                    _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+                   channel.unblock(this);
+                   _blockedChannels.remove(channel);
+
+                }
+
+            }
+
+
+
+        }
+    }
+
+    private void checkCapacity()
+    {
+        if(_capacity != 0L)
+        {
+            if(_atomicQueueSize.get() <= _flowResumeCapacity)
+            {
+                //Underfull log message
+                _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+
+                for(AMQChannel c : _blockedChannels.keySet())
+                {
+                    c.unblock(this);
+                    _blockedChannels.remove(c);
+                }
+            }
+        }
+    }
+
+
     public void deliverAsync()
     {
         Runner runner = new Runner(_stateChangeCount.incrementAndGet());
@@ -1544,6 +1604,7 @@
         }
     }
 
+
     public void checkMessageStatus() throws AMQException
     {
 
@@ -1651,6 +1712,27 @@
         }
     }
 
+    public long getCapacity()
+    {
+        return _capacity;
+    }
+
+    public void setCapacity(long capacity)
+    {
+        _capacity = capacity;
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return _flowResumeCapacity;
+    }
+
+    public void setFlowResumeCapacity(long flowResumeCapacity)
+    {
+        _flowResumeCapacity = flowResumeCapacity;
+    }
+
+
     public Set<NotificationCheck> getNotificationChecks()
     {
         return _notificationChecks;
@@ -1720,6 +1802,8 @@
             setMaximumMessageSize(config.getMaximumMessageSize());
             setMaximumMessageCount(config.getMaximumMessageCount());
             setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+            _capacity = config.getCapacity();
+            _flowResumeCapacity = config.getFlowResumeCapacity();
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Thu Oct  1 18:09:10 2009
@@ -97,6 +97,7 @@
             try
             {
                 QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+                _queue.checkCapacity(_channel);
 
                 if(entry.immediateAndNotDelivered())
                 {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Oct  1 18:09:10 2009
@@ -91,6 +91,8 @@
     public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
     {
         QueueEntry entry = queue.enqueue(_storeContext, message);
+        queue.checkCapacity(_channel);
+
         
         //following check implements the functionality
         //required by the 'immediate' flag:

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Oct  1 18:09:10 2009
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.AMQException;
 import org.apache.commons.configuration.Configuration;
 
@@ -271,6 +272,15 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public boolean getBlockOnQueueFull()
+    {
+        return false;
+    }
+
+    public void setBlockOnQueueFull(boolean block)
+    {
+    }
+
     public long getMinimumAlertRepeatGap()
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
@@ -285,8 +295,7 @@
     {
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
-
-    @Override
+   
     public void checkMessageStatus() throws AMQException
     {
         //To change body of implemented methods use File | Settings | File Templates.
@@ -317,6 +326,10 @@
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
+    public void checkCapacity(AMQChannel channel)
+    {               
+    }
+
     public ManagedObject getManagedObject()
     {
         return null;  //To change body of implemented methods use File | Settings | File Templates.
@@ -327,12 +340,31 @@
         return 0;  //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    @Override
     public void setMinimumAlertRepeatGap(long value)
     {
         
     }
 
+    public long getCapacity()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setCapacity(long capacity)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public long getFlowResumeCapacity()
+    {
+        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void setFlowResumeCapacity(long flowResumeCapacity)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
     public void configure(QueueConfiguration config)
     {
         

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct  1 18:09:10 2009
@@ -115,6 +115,7 @@
 {
 
 
+
     public static final class IdToConsumerMap<C extends BasicMessageConsumer>
     {
         private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -199,16 +200,32 @@
      * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
      * not need to be attached to a queue.
      */
-    protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+    protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
 
     /**
      * The default value for mandatory flag used by producers created by this session is true. That is, server will not
      * silently drop messages where no queue is connected to the exchange for the message.
      */
-    protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+    protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+    protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+    /**
+     * The period to wait while flow controlled before sending a log message confirming that the session is still
+     * waiting on flow control being revoked
+     */
+    protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+    /**
+     * The period to wait while flow controlled before declaring a failure
+     */
+    public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+    protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+                                                                  DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
 
     protected final boolean DECLARE_QUEUES =
         Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
     protected final boolean DECLARE_EXCHANGES =
         Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
 
@@ -2274,7 +2291,7 @@
     private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
             throws JMSException
     {
-        return createProducerImpl(destination, mandatory, immediate, false);
+        return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
     }
 
     private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2745,15 +2762,26 @@
     public void setFlowControl(final boolean active)
     {
         _flowControl.setFlowControl(active);
+        _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
     }
 
-    public void checkFlowControl() throws InterruptedException
+    public void checkFlowControl() throws InterruptedException, JMSException
     {
+        long expiryTime = 0L;
         synchronized (_flowControl)
         {
-            while (!_flowControl.getFlowControl())
+            while (!_flowControl.getFlowControl() &&
+                   (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+                                     : expiryTime) >= System.currentTimeMillis() )
+            {
+
+                _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+                _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+            }
+            if(!_flowControl.getFlowControl())
             {
-                _flowControl.wait();
+                _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+                throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
             }
         }
 

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Thu Oct  1 18:09:10 2009
@@ -39,6 +39,7 @@
     private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
     private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
     private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+    private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
     private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
     private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
     private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@
 
     public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
     {
-        return false;
+        _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+        return true;
     }
 
     public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException

Copied: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (from r820624, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?p2=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java&r1=820624&r2=820739&rev=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Thu Oct  1 18:09:10 2009
@@ -20,37 +20,28 @@
 */
 package org.apache.qpid.server.queue;
 
-import junit.framework.TestCase;
-import junit.framework.Assert;
 import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.url.URLSyntaxException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 
 import javax.jms.*;
 import javax.naming.NamingException;
-import javax.naming.Context;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
-public class PriorityTest extends QpidTestCase
+public class ProducerFlowControlTest extends QpidTestCase
 {
     private static final int TIMEOUT = 1500;
 
 
-    private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+    private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
 
-    protected final String QUEUE = "PriorityQueue";
+    protected final String QUEUE = "ProducerFlowControl";
 
     private static final int MSG_COUNT = 50;
 
@@ -63,19 +54,20 @@
 
 
     private MessageConsumer consumer;
-    
+    private final AtomicInteger _sentMessages = new AtomicInteger();
+
     protected void setUp() throws Exception
     {
         super.setUp();
 
         producerConnection = getConnection();
-        producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         producerConnection.start();
-        
+
         consumerConnection = getConnection();
         consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        
+
     }
 
     protected void tearDown() throws Exception
@@ -85,126 +77,248 @@
         super.tearDown();
     }
 
-    public void testPriority() throws JMSException, NamingException, AMQException
+    public void testCapacityExceededCausesBlock()
+            throws JMSException, NamingException, AMQException, InterruptedException
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-priorities",10);
+        arguments.put("x-qpid-capacity",1000);
+        arguments.put("x-qpid-flow-resume-capacity",800);
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
         queue = new AMQQueue("amq.direct",QUEUE);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
 
-        for (int msg = 0; msg < MSG_COUNT; msg++)
-        {
-            producer.setPriority(msg % 10);
-            producer.send(nextMessage(msg, false, producerSession, producer));
-        }
-        producerSession.commit();
-        producer.close();
-        producerSession.close();
-        producerConnection.close();
+        _sentMessages.set(0);
+
+
+        // try to send 5 messages (should block after 4)
+        sendMessagesAsync(producer, producerSession, 5, 50L);
+
+        Thread.sleep(5000);
+
+        assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
 
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
-        Message received;
-        int receivedCount = 0;
-        Message previous = null;
-        int messageCount = 0;
-        while((received = consumer.receive(1000))!=null)
-        {   
-            messageCount++;
-            if(previous != null)
-            {
-                assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
-            }
 
-            previous = received;
-            receivedCount++;
-        }
 
-        assertEquals("Incorrect number of message received", 50, receivedCount);
+        consumer.receive();
+
+        Thread.sleep(1000);
+
+        assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
+
+
+        consumer.receive();
+
+        Thread.sleep(1000);
+
+        assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
+
     }
-    
-    public void testOddOrdering() throws AMQException, JMSException
+
+
+    public void testFlowControlOnCapacityResumeEqual()
+            throws JMSException, NamingException, AMQException, InterruptedException
     {
         final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-priorities",3);
+        arguments.put("x-qpid-capacity",1000);
+        arguments.put("x-qpid-flow-resume-capacity",1000);
         ((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
         queue = new AMQQueue("amq.direct",QUEUE);
         ((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
         producer = producerSession.createProducer(queue);
-        
-        // In order ABC
-        producer.setPriority(9);
-        producer.send(nextMessage(1, false, producerSession, producer));
-        producer.setPriority(4);
-        producer.send(nextMessage(2, false, producerSession, producer));
-        producer.setPriority(1);
-        producer.send(nextMessage(3, false, producerSession, producer));
-
-        // Out of order BAC
-        producer.setPriority(4);
-        producer.send(nextMessage(4, false, producerSession, producer));
-        producer.setPriority(9);
-        producer.send(nextMessage(5, false, producerSession, producer));
-        producer.setPriority(1);
-        producer.send(nextMessage(6, false, producerSession, producer));
-
-        // Out of order BCA 
-        producer.setPriority(4);
-        producer.send(nextMessage(7, false, producerSession, producer));
-        producer.setPriority(1);
-        producer.send(nextMessage(8, false, producerSession, producer));
-        producer.setPriority(9);
-        producer.send(nextMessage(9, false, producerSession, producer));
-        
-        // Reverse order CBA
-        producer.setPriority(1);
-        producer.send(nextMessage(10, false, producerSession, producer));
-        producer.setPriority(4);
-        producer.send(nextMessage(11, false, producerSession, producer));
-        producer.setPriority(9);
-        producer.send(nextMessage(12, false, producerSession, producer));
-        producerSession.commit();
-        
+
+        _sentMessages.set(0);
+
+
+        // try to send 5 messages (should block after 4)
+        sendMessagesAsync(producer, producerSession, 5, 50L);
+
+        Thread.sleep(5000);
+
+        assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
         consumer = consumerSession.createConsumer(queue);
         consumerConnection.start();
+
+
+        consumer.receive();
+
+        Thread.sleep(1000);
+
+        assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
         
-        Message msg = consumer.receive(TIMEOUT);
-        assertEquals(1, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(5, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(9, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(12, msg.getIntProperty("msg"));
+
+    }
+
+
+    public void testFlowControlSoak()
+            throws Exception, NamingException, AMQException, InterruptedException
+    {
+        _sentMessages.set(0);
+        final int numProducers = 10;
+        final int numMessages = 100;
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",6000);
+        arguments.put("x-qpid-flow-resume-capacity",3000);
+
+        ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
+
+        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
+        consumerConnection.start();
+
+        Connection[] producers = new Connection[numProducers];
+        for(int i = 0 ; i < numProducers; i ++)
+        {
+
+            producers[i] = getConnection();
+            producers[i].start();
+            Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer myproducer = session.createProducer(queue);
+            MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+        }
+
+        consumer = consumerSession.createConsumer(queue);
+        consumerConnection.start();
+
+        for(int j = 0; j < numProducers * numMessages; j++)
+        {
         
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(2, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(4, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(7, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(11, msg.getIntProperty("msg"));
+            Message msg = consumer.receive(5000);
+            Thread.sleep(50L);
+            assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
+
+        }
+
+
+
+        Message msg = consumer.receive(500);
+        assertNull("extra message received", msg);
+
+
+        for(int i = 0; i < numProducers; i++)
+        {
+            producers[i].close();
+        }
+
+    }
+
+
+
+    public void testSendTimeout()
+            throws JMSException, NamingException, AMQException, InterruptedException
+    {
+        long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+        System.setProperty("qpid.flow_control_wait_failure","3000");
+        Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        final Map<String,Object> arguments = new HashMap<String, Object>();
+        arguments.put("x-qpid-capacity",1000);
+        arguments.put("x-qpid-flow-resume-capacity",800);
+        ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+        queue = new AMQQueue("amq.direct",QUEUE);
+        ((AMQSession) session).declareAndBind((AMQDestination)queue);
+        producer = session.createProducer(queue);
+
+        _sentMessages.set(0);
+
+
+        // try to send 5 messages (should block after 4)
+        MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+        Thread.sleep(10000);
+
+        Exception e = sender.getException();
+
+        assertNotNull("No timeout exception on sending", e);
+
+        System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue));
         
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(3, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(6, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(8, msg.getIntProperty("msg"));
-        msg = consumer.receive(TIMEOUT);
-        assertEquals(10, msg.getIntProperty("msg"));
+
+
+    }
+
+    private MessageSender sendMessagesAsync(final MessageProducer producer,
+                                            final Session producerSession,
+                                            final int numMessages,
+                                            long sleepPeriod)
+    {
+        MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+        new Thread(sender).start();
+        return sender;
+    }
+
+    private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+            throws JMSException
+    {
+
+        for (int msg = 0; msg < numMessages; msg++)
+        {
+            producer.send(nextMessage(msg, producerSession));
+            _sentMessages.incrementAndGet();
+
+            try
+            {
+                Thread.sleep(sleepPeriod);
+            }
+            catch (InterruptedException e)
+            {
+            }
+        }
     }
 
-    private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+    private static final byte[] BYTE_300 = new byte[300];
+
+
+    private Message nextMessage(int msg, Session producerSession) throws JMSException
     {
-        Message send = producerSession.createTextMessage("Message: " + msg);
+        BytesMessage send = producerSession.createBytesMessage();
+        send.writeBytes(BYTE_300);
         send.setIntProperty("msg", msg);
 
         return send;
     }
 
 
-}
+    private class MessageSender implements Runnable
+    {
+        private final MessageProducer _producer;
+        private final Session _producerSession;
+        private final int _numMessages;
+
+
+
+        private JMSException _exception;
+        private long _sleepPeriod;
+
+        public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+        {
+            _producer = producer;
+            _producerSession = producerSession;
+            _numMessages = numMessages;
+            _sleepPeriod = sleepPeriod;
+        }
+
+        public void run()
+        {
+            try
+            {
+                sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+            }
+            catch (JMSException e)
+            {
+                _exception = e;
+            }
+        }
+
+        public JMSException getException()
+        {
+            return _exception;
+        }
+    }
+}
\ No newline at end of file

Modified: qpid/trunk/qpid/java/test-profiles/010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/010Excludes?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/010Excludes Thu Oct  1 18:09:10 2009
@@ -96,3 +96,6 @@
 // QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path
 org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
 
+//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
+org.apache.qpid.server.queue.ProducerFlowControlTest
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org