You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/01 20:09:11 UTC
svn commit: r820739 - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/configuration/
broker/src/main/java/org/apache/qpid/server/logging/messages/
broker/src/main/java/org/apache/qpid/s...
Author: rgodfrey
Date: Thu Oct 1 18:09:10 2009
New Revision: 820739
URL: http://svn.apache.org/viewvc?rev=820739&view=rev
Log:
QPID-942 : Add Simplistic Producer Flow Control to the java Broker / java 0-8/0-9 client
Added:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
- copied, changed from r820624, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
qpid/trunk/qpid/java/test-profiles/010Excludes
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Oct 1 18:09:10 2009
@@ -27,13 +27,12 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
@@ -42,12 +41,8 @@
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageHandleFactory;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.queue.UnauthorizedAccessException;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
+import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
@@ -59,6 +54,7 @@
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
@@ -119,6 +115,11 @@
private final AMQProtocolSession _session;
private boolean _closing;
+ private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new ConcurrentHashMap<AMQQueue, Boolean>();
+
+ private final AtomicBoolean _blocking = new AtomicBoolean(false);
+
+
private LogActor _actor;
private LogSubject _logSubject;
@@ -798,6 +799,7 @@
_actor.message(_logSubject, ChannelMessages.CHN_1002("Started"));
}
+
// This section takes two different approaches to perform to perform
// the same function. Ensuring that the Subscription has taken note
// of the change in Channel State
@@ -978,4 +980,37 @@
{
return _actor;
}
+
+ public void block(AMQQueue queue)
+ {
+ if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+ {
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1005(queue.getName().toString()));
+ flow(false);
+ }
+ }
+ }
+
+ public void unblock(AMQQueue queue)
+ {
+ if(_blockingQueues.remove(queue))
+ {
+ if(_blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.CHN_1006());
+
+ flow(true);
+ }
+ }
+ }
+
+ private void flow(boolean flow)
+ {
+ MethodRegistry methodRegistry = _session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
+ _session.writeFrame(responseBody.generateFrame(_channelId));
+ }
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Oct 1 18:09:10 2009
@@ -108,4 +108,14 @@
return _config.getLong("minimumAlertRepeatGap", _vHostConfig.getMinimumAlertRepeatGap());
}
+ public long getCapacity()
+ {
+ return _config.getLong("capacity", _vHostConfig.getCapacity());
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("flowResumeCapacity", _vHostConfig.getFlowResumeCapacity());
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Thu Oct 1 18:09:10 2009
@@ -103,6 +103,8 @@
envVarMap.put("QPID_MAXIMUMQUEUEDEPTH", "maximumQueueDepth");
envVarMap.put("QPID_MAXIMUMMESSAGESIZE", "maximumMessageSize");
envVarMap.put("QPID_MINIMUMALERTREPEATGAP", "minimumAlertRepeatGap");
+ envVarMap.put("QPID_QUEUECAPACITY", "capacity");
+ envVarMap.put("QPID_FLOWRESUMECAPACITY", "flowResumeCapacity");
envVarMap.put("QPID_SOCKETRECEIVEBUFFER", "connector.socketReceiveBuffer");
envVarMap.put("QPID_SOCKETWRITEBUFFER", "connector.socketWriteBuffer");
envVarMap.put("QPID_TCPNODELAY", "connector.tcpNoDelay");
@@ -289,7 +291,6 @@
return conf;
}
- @Override
public void handle(Signal arg0)
{
try
@@ -507,6 +508,16 @@
return getConfig().getLong("minimumAlertRepeatGap", 0);
}
+ public long getCapacity()
+ {
+ return getConfig().getLong("capacity", 0L);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return getConfig().getLong("flowResumeCapacity", getCapacity());
+ }
+
public int getProcessors()
{
return getConfig().getInt("connector.processors", 4);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Thu Oct 1 18:09:10 2009
@@ -166,4 +166,15 @@
return _config.getLong("queues.minimumAlertRepeatGap", 0);
}
+
+ public long getCapacity()
+ {
+ return _config.getLong("queues.capacity", 0l);
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _config.getLong("queues.flowResumeCapacity", getCapacity());
+ }
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Thu Oct 1 18:09:10 2009
@@ -249,12 +249,16 @@
# 0 - bytes allowed in prefetch
# 1 - number of messagse.
CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
#Queue
# 0 - owner
# 1 - priority
QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
#Exchange
# 0 - type
@@ -269,4 +273,4 @@
#Subscription
SUB-1001 = Create[ : Durable][ : Arguments : {0}]
SUB-1002 = Close
-SUB-1003 = State : {0}
\ No newline at end of file
+SUB-1003 = State : {0}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Oct 1 18:09:10 2009
@@ -160,6 +160,17 @@
void setMinimumAlertRepeatGap(long value);
+ long getCapacity();
+
+ void setCapacity(long capacity);
+
+
+ long getFlowResumeCapacity();
+
+ void setFlowResumeCapacity(long flowResumeCapacity);
+
+
+
void deleteMessageFromTop(StoreContext storeContext) throws AMQException;
long clearQueue(StoreContext storeContext) throws AMQException;
@@ -180,6 +191,8 @@
void stop();
+ void checkCapacity(AMQChannel channel);
+
/**
* ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
* already exists.
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Oct 1 18:09:10 2009
@@ -26,11 +26,105 @@
import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Map;
+import java.util.HashMap;
+
public class AMQQueueFactory
{
public static final AMQShortString X_QPID_PRIORITIES = new AMQShortString("x-qpid-priorities");
+ private abstract static class QueueProperty
+ {
+
+ private final AMQShortString _argumentName;
+
+
+ public QueueProperty(String argumentName)
+ {
+ _argumentName = new AMQShortString(argumentName);
+ }
+
+ public AMQShortString getArgumentName()
+ {
+ return _argumentName;
+ }
+
+
+ public abstract void setPropertyValue(AMQQueue queue, Object value);
+
+ }
+
+ private abstract static class QueueLongProperty extends QueueProperty
+ {
+
+ public QueueLongProperty(String argumentName)
+ {
+ super(argumentName);
+ }
+
+ public void setPropertyValue(AMQQueue queue, Object value)
+ {
+ if(value instanceof Number)
+ {
+ setPropertyValue(queue, ((Number)value).longValue());
+ }
+
+ }
+
+ abstract void setPropertyValue(AMQQueue queue, long value);
+
+
+ }
+
+ private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
+ new QueueLongProperty("x-qpid-maximum-message-age")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageAge(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-size")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageSize(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-maximum-message-count")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumMessageCount(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMinimumAlertRepeatGap(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setCapacity(value);
+ }
+ },
+ new QueueLongProperty("x-qpid-flow-resume-capacity")
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setFlowResumeCapacity(value);
+ }
+ }
+
+ };
+
+
+
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -53,6 +147,18 @@
//Register the new queue
virtualHost.getQueueRegistry().registerQueue(q);
q.configure(virtualHost.getConfiguration().getQueueConfiguration(name.asString()));
+
+ if(arguments != null)
+ {
+ for(QueueProperty p : DECLAREABLE_PROPERTIES)
+ {
+ if(arguments.containsKey(p.getArgumentName()))
+ {
+ p.setPropertyValue(q, arguments.get(p.getArgumentName()));
+ }
+ }
+ }
+
return q;
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Oct 1 18:09:10 2009
@@ -42,8 +42,7 @@
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
-
+ private final AMQMessage _message;
private Set<Subscription> _rejectedBy = null;
@@ -191,7 +190,7 @@
public boolean immediateAndNotDelivered()
{
- return _message.immediateAndNotDelivered();
+ return getMessage().immediateAndNotDelivered();
}
public void setRedelivered(boolean b)
@@ -393,4 +392,5 @@
{
return _queueEntryList;
}
+
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Oct 1 18:09:10 2009
@@ -1,11 +1,10 @@
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +34,7 @@
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.AMQChannel;
/*
*
@@ -96,6 +96,8 @@
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
+ private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
/** max allowed size(KB) of a single message */
public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -122,6 +124,10 @@
private LogSubject _logSubject;
private LogActor _logActor;
+
+ private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
@@ -629,6 +635,8 @@
throw new FailedDequeueException(_name.toString(), e);
}
+ checkCapacity();
+
}
private void decrementQueueSize(final QueueEntry entry)
@@ -1173,6 +1181,58 @@
}
}
+ public void checkCapacity(AMQChannel channel)
+ {
+ if(_capacity != 0l)
+ {
+ if(_atomicQueueSize.get() > _capacity)
+ {
+ //Overfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1003(_atomicQueueSize.get(), _capacity));
+
+ if(_blockedChannels.putIfAbsent(channel, Boolean.TRUE)==null)
+ {
+ channel.block(this);
+ }
+
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+ channel.unblock(this);
+ _blockedChannels.remove(channel);
+
+ }
+
+ }
+
+
+
+ }
+ }
+
+ private void checkCapacity()
+ {
+ if(_capacity != 0L)
+ {
+ if(_atomicQueueSize.get() <= _flowResumeCapacity)
+ {
+ //Underfull log message
+ _logActor.message(_logSubject, QueueMessages.QUE_1004(_atomicQueueSize.get(), _flowResumeCapacity));
+
+
+ for(AMQChannel c : _blockedChannels.keySet())
+ {
+ c.unblock(this);
+ _blockedChannels.remove(c);
+ }
+ }
+ }
+ }
+
+
public void deliverAsync()
{
Runner runner = new Runner(_stateChangeCount.incrementAndGet());
@@ -1544,6 +1604,7 @@
}
}
+
public void checkMessageStatus() throws AMQException
{
@@ -1651,6 +1712,27 @@
}
}
+ public long getCapacity()
+ {
+ return _capacity;
+ }
+
+ public void setCapacity(long capacity)
+ {
+ _capacity = capacity;
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return _flowResumeCapacity;
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ _flowResumeCapacity = flowResumeCapacity;
+ }
+
+
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1720,6 +1802,8 @@
setMaximumMessageSize(config.getMaximumMessageSize());
setMaximumMessageCount(config.getMaximumMessageCount());
setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Thu Oct 1 18:09:10 2009
@@ -97,6 +97,7 @@
try
{
QueueEntry entry = _queue.enqueue(getStoreContext(),_message);
+ _queue.checkCapacity(_channel);
if(entry.immediateAndNotDelivered())
{
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Thu Oct 1 18:09:10 2009
@@ -91,6 +91,8 @@
public void deliver(final AMQQueue queue, AMQMessage message) throws AMQException
{
QueueEntry entry = queue.enqueue(_storeContext, message);
+ queue.checkCapacity(_channel);
+
//following check implements the functionality
//required by the 'immediate' flag:
Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Thu Oct 1 18:09:10 2009
@@ -31,6 +31,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.AMQException;
import org.apache.commons.configuration.Configuration;
@@ -271,6 +272,15 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public boolean getBlockOnQueueFull()
+ {
+ return false;
+ }
+
+ public void setBlockOnQueueFull(boolean block)
+ {
+ }
+
public long getMinimumAlertRepeatGap()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
@@ -285,8 +295,7 @@
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
-
- @Override
+
public void checkMessageStatus() throws AMQException
{
//To change body of implemented methods use File | Settings | File Templates.
@@ -317,6 +326,10 @@
//To change body of implemented methods use File | Settings | File Templates.
}
+ public void checkCapacity(AMQChannel channel)
+ {
+ }
+
public ManagedObject getManagedObject()
{
return null; //To change body of implemented methods use File | Settings | File Templates.
@@ -327,12 +340,31 @@
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
public void setMinimumAlertRepeatGap(long value)
{
}
+ public long getCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setCapacity(long capacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public long getFlowResumeCapacity()
+ {
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setFlowResumeCapacity(long flowResumeCapacity)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void configure(QueueConfiguration config)
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct 1 18:09:10 2009
@@ -115,6 +115,7 @@
{
+
public static final class IdToConsumerMap<C extends BasicMessageConsumer>
{
private final BasicMessageConsumer[] _fastAccessConsumers = new BasicMessageConsumer[16];
@@ -199,16 +200,32 @@
* The default value for immediate flag used by producers created by this session is false. That is, a consumer does
* not need to be attached to a queue.
*/
- protected static final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
+ protected final boolean DEFAULT_IMMEDIATE = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by producers created by this session is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
- protected static final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+ protected final boolean DEFAULT_MANDATORY = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
+
+ protected final boolean DEFAULT_WAIT_ON_SEND = Boolean.parseBoolean(System.getProperty("qpid.default_wait_on_send", "false"));
+
+ /**
+ * The period to wait while flow controlled before sending a log message confirming that the session is still
+ * waiting on flow control being revoked
+ */
+ protected final long FLOW_CONTROL_WAIT_PERIOD = Long.getLong("qpid.flow_control_wait_notify_period",5000L);
+
+ /**
+ * The period to wait while flow controlled before declaring a failure
+ */
+ public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
+ protected final long FLOW_CONTROL_WAIT_FAILURE = Long.getLong("qpid.flow_control_wait_failure",
+ DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
protected final boolean DECLARE_QUEUES =
Boolean.parseBoolean(System.getProperty("qpid.declare_queues", "true"));
+
protected final boolean DECLARE_EXCHANGES =
Boolean.parseBoolean(System.getProperty("qpid.declare_exchanges", "true"));
@@ -2274,7 +2291,7 @@
private P createProducerImpl(Destination destination, boolean mandatory, boolean immediate)
throws JMSException
{
- return createProducerImpl(destination, mandatory, immediate, false);
+ return createProducerImpl(destination, mandatory, immediate, DEFAULT_WAIT_ON_SEND);
}
private P createProducerImpl(final Destination destination, final boolean mandatory,
@@ -2745,15 +2762,26 @@
public void setFlowControl(final boolean active)
{
_flowControl.setFlowControl(active);
+ _logger.warn("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
}
- public void checkFlowControl() throws InterruptedException
+ public void checkFlowControl() throws InterruptedException, JMSException
{
+ long expiryTime = 0L;
synchronized (_flowControl)
{
- while (!_flowControl.getFlowControl())
+ while (!_flowControl.getFlowControl() &&
+ (expiryTime == 0L ? (expiryTime = System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE)
+ : expiryTime) >= System.currentTimeMillis() )
+ {
+
+ _flowControl.wait(FLOW_CONTROL_WAIT_PERIOD);
+ _logger.warn("Message send delayed by " + (System.currentTimeMillis() + FLOW_CONTROL_WAIT_FAILURE - expiryTime)/1000 + "s due to broker enforced flow control");
+ }
+ if(!_flowControl.getFlowControl())
{
- _flowControl.wait();
+ _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
+ throw new JMSException("Unable to send message for " + FLOW_CONTROL_WAIT_FAILURE/1000 + " seconds due to broker enforced flow control");
}
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Thu Oct 1 18:09:10 2009
@@ -39,6 +39,7 @@
private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+ private static final ChannelFlowMethodHandler _channelFlowMethodHandler = ChannelFlowMethodHandler.getInstance();
private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
@@ -159,7 +160,8 @@
public boolean dispatchChannelFlow(ChannelFlowBody body, int channelId) throws AMQException
{
- return false;
+ _channelFlowMethodHandler.methodReceived(_session, body, channelId);
+ return true;
}
public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
Copied: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (from r820624, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?p2=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java&p1=qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java&r1=820624&r2=820739&rev=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/PriorityTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Thu Oct 1 18:09:10 2009
@@ -20,37 +20,28 @@
*/
package org.apache.qpid.server.queue;
-import junit.framework.TestCase;
-import junit.framework.Assert;
import org.apache.log4j.Logger;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import javax.jms.*;
import javax.naming.NamingException;
-import javax.naming.Context;
-import javax.naming.spi.InitialContextFactory;
-import java.util.Hashtable;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
-public class PriorityTest extends QpidTestCase
+public class ProducerFlowControlTest extends QpidTestCase
{
private static final int TIMEOUT = 1500;
- private static final Logger _logger = Logger.getLogger(PriorityTest.class);
+ private static final Logger _logger = Logger.getLogger(ProducerFlowControlTest.class);
- protected final String QUEUE = "PriorityQueue";
+ protected final String QUEUE = "ProducerFlowControl";
private static final int MSG_COUNT = 50;
@@ -63,19 +54,20 @@
private MessageConsumer consumer;
-
+ private final AtomicInteger _sentMessages = new AtomicInteger();
+
protected void setUp() throws Exception
{
super.setUp();
producerConnection = getConnection();
- producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerConnection.start();
-
+
consumerConnection = getConnection();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
}
protected void tearDown() throws Exception
@@ -85,126 +77,248 @@
super.tearDown();
}
- public void testPriority() throws JMSException, NamingException, AMQException
+ public void testCapacityExceededCausesBlock()
+ throws JMSException, NamingException, AMQException, InterruptedException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-priorities",10);
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = new AMQQueue("amq.direct",QUEUE);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
- for (int msg = 0; msg < MSG_COUNT; msg++)
- {
- producer.setPriority(msg % 10);
- producer.send(nextMessage(msg, false, producerSession, producer));
- }
- producerSession.commit();
- producer.close();
- producerSession.close();
- producerConnection.close();
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
- Message received;
- int receivedCount = 0;
- Message previous = null;
- int messageCount = 0;
- while((received = consumer.receive(1000))!=null)
- {
- messageCount++;
- if(previous != null)
- {
- assertTrue("Messages arrived in unexpected order " + messageCount + " " + previous.getIntProperty("msg") + " " + received.getIntProperty("msg") + " " + previous.getJMSPriority() + " " + received.getJMSPriority(), (previous.getJMSPriority() > received.getJMSPriority()) || ((previous.getJMSPriority() == received.getJMSPriority()) && previous.getIntProperty("msg") < received.getIntProperty("msg")) );
- }
- previous = received;
- receivedCount++;
- }
- assertEquals("Incorrect number of message received", 50, receivedCount);
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 4, _sentMessages.get());
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message not sent after two messages received", 5, _sentMessages.get());
+
}
-
- public void testOddOrdering() throws AMQException, JMSException
+
+
+ public void testFlowControlOnCapacityResumeEqual()
+ throws JMSException, NamingException, AMQException, InterruptedException
{
final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-priorities",3);
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",1000);
((AMQSession) producerSession).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
queue = new AMQQueue("amq.direct",QUEUE);
((AMQSession) producerSession).declareAndBind((AMQDestination)queue);
producer = producerSession.createProducer(queue);
-
- // In order ABC
- producer.setPriority(9);
- producer.send(nextMessage(1, false, producerSession, producer));
- producer.setPriority(4);
- producer.send(nextMessage(2, false, producerSession, producer));
- producer.setPriority(1);
- producer.send(nextMessage(3, false, producerSession, producer));
-
- // Out of order BAC
- producer.setPriority(4);
- producer.send(nextMessage(4, false, producerSession, producer));
- producer.setPriority(9);
- producer.send(nextMessage(5, false, producerSession, producer));
- producer.setPriority(1);
- producer.send(nextMessage(6, false, producerSession, producer));
-
- // Out of order BCA
- producer.setPriority(4);
- producer.send(nextMessage(7, false, producerSession, producer));
- producer.setPriority(1);
- producer.send(nextMessage(8, false, producerSession, producer));
- producer.setPriority(9);
- producer.send(nextMessage(9, false, producerSession, producer));
-
- // Reverse order CBA
- producer.setPriority(1);
- producer.send(nextMessage(10, false, producerSession, producer));
- producer.setPriority(4);
- producer.send(nextMessage(11, false, producerSession, producer));
- producer.setPriority(9);
- producer.send(nextMessage(12, false, producerSession, producer));
- producerSession.commit();
-
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(5000);
+
+ assertEquals("Incorrect number of message sent before blocking", 4, _sentMessages.get());
+
consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
+
+
+ consumer.receive();
+
+ Thread.sleep(1000);
+
+ assertEquals("Message incorrectly sent after one message received", 5, _sentMessages.get());
- Message msg = consumer.receive(TIMEOUT);
- assertEquals(1, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(5, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(9, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(12, msg.getIntProperty("msg"));
+
+ }
+
+
+ public void testFlowControlSoak()
+ throws Exception, NamingException, AMQException, InterruptedException
+ {
+ _sentMessages.set(0);
+ final int numProducers = 10;
+ final int numMessages = 100;
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",6000);
+ arguments.put("x-qpid-flow-resume-capacity",3000);
+
+ ((AMQSession) consumerSession).createQueue(new AMQShortString(QUEUE), false, false, false, arguments);
+
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) consumerSession).declareAndBind((AMQDestination)queue);
+ consumerConnection.start();
+
+ Connection[] producers = new Connection[numProducers];
+ for(int i = 0 ; i < numProducers; i ++)
+ {
+
+ producers[i] = getConnection();
+ producers[i].start();
+ Session session = producers[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer myproducer = session.createProducer(queue);
+ MessageSender sender = sendMessagesAsync(myproducer, session, numMessages, 50L);
+ }
+
+ consumer = consumerSession.createConsumer(queue);
+ consumerConnection.start();
+
+ for(int j = 0; j < numProducers * numMessages; j++)
+ {
- msg = consumer.receive(TIMEOUT);
- assertEquals(2, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(4, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(7, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(11, msg.getIntProperty("msg"));
+ Message msg = consumer.receive(5000);
+ Thread.sleep(50L);
+ assertNotNull("Message not received("+j+"), sent: "+_sentMessages.get(), msg);
+
+ }
+
+
+
+ Message msg = consumer.receive(500);
+ assertNull("extra message received", msg);
+
+
+ for(int i = 0; i < numProducers; i++)
+ {
+ producers[i].close();
+ }
+
+ }
+
+
+
+ public void testSendTimeout()
+ throws JMSException, NamingException, AMQException, InterruptedException
+ {
+ long origTimeoutValue = Long.getLong("qpid.flow_control_wait_failure",AMQSession.DEFAULT_FLOW_CONTROL_WAIT_FAILURE);
+ System.setProperty("qpid.flow_control_wait_failure","3000");
+ Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ final Map<String,Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity",1000);
+ arguments.put("x-qpid-flow-resume-capacity",800);
+ ((AMQSession) session).createQueue(new AMQShortString(QUEUE), true, false, false, arguments);
+ queue = new AMQQueue("amq.direct",QUEUE);
+ ((AMQSession) session).declareAndBind((AMQDestination)queue);
+ producer = session.createProducer(queue);
+
+ _sentMessages.set(0);
+
+
+ // try to send 5 messages (should block after 4)
+ MessageSender sender = sendMessagesAsync(producer, producerSession, 5, 50L);
+
+ Thread.sleep(10000);
+
+ Exception e = sender.getException();
+
+ assertNotNull("No timeout exception on sending", e);
+
+ System.setProperty("qpid.flow_control_wait_failure",String.valueOf(origTimeoutValue));
- msg = consumer.receive(TIMEOUT);
- assertEquals(3, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(6, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(8, msg.getIntProperty("msg"));
- msg = consumer.receive(TIMEOUT);
- assertEquals(10, msg.getIntProperty("msg"));
+
+
+ }
+
+ private MessageSender sendMessagesAsync(final MessageProducer producer,
+ final Session producerSession,
+ final int numMessages,
+ long sleepPeriod)
+ {
+ MessageSender sender = new MessageSender(producer, producerSession, numMessages,sleepPeriod);
+ new Thread(sender).start();
+ return sender;
+ }
+
+ private void sendMessages(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ throws JMSException
+ {
+
+ for (int msg = 0; msg < numMessages; msg++)
+ {
+ producer.send(nextMessage(msg, producerSession));
+ _sentMessages.incrementAndGet();
+
+ try
+ {
+ Thread.sleep(sleepPeriod);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
}
- private Message nextMessage(int msg, boolean first, Session producerSession, MessageProducer producer) throws JMSException
+ private static final byte[] BYTE_300 = new byte[300];
+
+
+ private Message nextMessage(int msg, Session producerSession) throws JMSException
{
- Message send = producerSession.createTextMessage("Message: " + msg);
+ BytesMessage send = producerSession.createBytesMessage();
+ send.writeBytes(BYTE_300);
send.setIntProperty("msg", msg);
return send;
}
-}
+ private class MessageSender implements Runnable
+ {
+ private final MessageProducer _producer;
+ private final Session _producerSession;
+ private final int _numMessages;
+
+
+
+ private JMSException _exception;
+ private long _sleepPeriod;
+
+ public MessageSender(MessageProducer producer, Session producerSession, int numMessages, long sleepPeriod)
+ {
+ _producer = producer;
+ _producerSession = producerSession;
+ _numMessages = numMessages;
+ _sleepPeriod = sleepPeriod;
+ }
+
+ public void run()
+ {
+ try
+ {
+ sendMessages(_producer, _producerSession, _numMessages, _sleepPeriod);
+ }
+ catch (JMSException e)
+ {
+ _exception = e;
+ }
+ }
+
+ public JMSException getException()
+ {
+ return _exception;
+ }
+ }
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/test-profiles/010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/010Excludes?rev=820739&r1=820738&r2=820739&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/010Excludes Thu Oct 1 18:09:10 2009
@@ -96,3 +96,6 @@
// QPID-2118 : 0-10 Java client has differrent error handling to 0-8 code path
org.apache.qpid.test.client.message.SelectorTest#testRuntimeSelectorError
+//QPID-942 : Implemented Channel.Flow based Producer Side flow control to the Java Broker (not in CPP Broker)
+org.apache.qpid.server.queue.ProducerFlowControlTest
+
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org