You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/07 09:27:17 UTC
svn commit: r1773033 - in /qpid/java/trunk:
broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
systests/src/test/java/org/apache/qpid/server/queue/ test-profiles/
Author: rgodfrey
Date: Wed Dec 7 09:27:16 2016
New Revision: 1773033
URL: http://svn.apache.org/viewvc?rev=1773033&view=rev
Log:
QPID-7529 : add actions to ensure producer flow control is activated when queues become oversized
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Wed Dec 7 09:27:16 2016
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -39,6 +40,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
public class ExchangeDestination implements ReceivingDestination, SendingDestination
{
@@ -74,7 +76,9 @@ public class ExchangeDestination impleme
return OUTCOMES;
}
- public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ public Outcome send(final Message_1_0 message,
+ ServerTransaction txn,
+ final Action<MessageInstance> action)
{
final InstanceProperties instanceProperties =
new InstanceProperties()
@@ -104,7 +108,7 @@ public class ExchangeDestination impleme
routingAddress,
instanceProperties,
txn,
- null);
+ action);
if(enqueues == 0)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Wed Dec 7 09:27:16 2016
@@ -438,15 +438,16 @@ public abstract class LinkEndpoint<T ext
if(_stopped)
{
flow.setLinkCredit(UnsignedInteger.ZERO);
+ flow.setDrain(true);
_lastSentCreditLimit = _deliveryCount;
}
else
{
flow.setLinkCredit(_linkCredit);
_lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+ flow.setDrain(_drain);
}
flow.setAvailable(_available);
- flow.setDrain(_drain);
if(setTransactionId)
{
flow.setProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), _flowTransactionId));
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Wed Dec 7 09:27:16 2016
@@ -27,6 +27,7 @@ import org.apache.qpid.server.logging.Ev
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
public class NodeReceivingDestination implements ReceivingDestination
{
@@ -74,7 +76,9 @@ public class NodeReceivingDestination im
return OUTCOMES;
}
- public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ public Outcome send(final Message_1_0 message,
+ ServerTransaction txn,
+ final Action<MessageInstance> action)
{
final InstanceProperties instanceProperties =
new InstanceProperties()
@@ -102,7 +106,7 @@ public class NodeReceivingDestination im
String routingAddress;
routingAddress = getRoutingAddress(message);
- int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, null);
+ int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, action);
if(enqueues == 0)
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Wed Dec 7 09:27:16 2016
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
{
@@ -54,7 +56,9 @@ public class QueueDestination extends Me
return OUTCOMES;
}
- public Outcome send(final Message_1_0 message, ServerTransaction txn)
+ public Outcome send(final Message_1_0 message,
+ ServerTransaction txn,
+ final Action<MessageInstance> action)
{
txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction()
@@ -66,7 +70,7 @@ public class QueueDestination extends Me
{
try
{
- getQueue().enqueue(message, null, records[0]);
+ getQueue().enqueue(message, action, records[0]);
}
finally
{
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Wed Dec 7 09:27:16 2016
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
public interface ReceivingDestination extends Destination
{
@@ -36,7 +38,7 @@ public interface ReceivingDestination ex
Outcome[] getOutcomes();
- Outcome send(Message_1_0 message, ServerTransaction txn);
+ Outcome send(Message_1_0 message, ServerTransaction txn, final Action<MessageInstance> postEnqueueAction);
int getCredit();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Wed Dec 7 09:27:16 2016
@@ -158,6 +158,10 @@ public class ReceivingLinkEndpoint exten
_remoteDrain = Boolean.TRUE.equals((Boolean) flow.getDrain());
setAvailable(flow.getAvailable());
setDeliveryCount(flow.getDeliveryCount());
+ if(isDrained())
+ {
+
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Dec 7 09:27:16 2016
@@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
@@ -68,6 +69,7 @@ import org.apache.qpid.server.model.Name
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
@@ -102,6 +104,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -150,6 +153,8 @@ public class Session_1_0 implements AMQS
private short _receivingChannel;
private short _sendingChannel = -1;
+ private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
+
// has to be a power of two
private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
@@ -1336,7 +1341,8 @@ public class Session_1_0 implements AMQS
for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
{
StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
- if (queue == link.getDestination())
+
+ if (isQueueDestinationForLink(queue, link.getDestination()))
{
endpoint.setStopped(true);
}
@@ -1345,6 +1351,13 @@ public class Session_1_0 implements AMQS
}
}
+ private boolean isQueueDestinationForLink(final Queue<?> queue, final ReceivingDestination recvDest)
+ {
+ return (recvDest instanceof NodeReceivingDestination && queue == ((NodeReceivingDestination) recvDest).getDestination())
+ || recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
+
+ }
+
@Override
public void unblock(final Queue<?> queue)
{
@@ -1370,7 +1383,7 @@ public class Session_1_0 implements AMQS
for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
{
StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
- if (queue == link.getDestination())
+ if (isQueueDestinationForLink(queue, link.getDestination()))
{
endpoint.setStopped(false);
}
@@ -1698,6 +1711,11 @@ public class Session_1_0 implements AMQS
_unacknowledgedMessages--;
}
+ public CapacityCheckAction getCapacityCheckAction()
+ {
+ return _capacityCheckAction;
+ }
+
private class ConsumerClosedListener extends AbstractConfigurationChangeListener
{
@Override
@@ -1816,5 +1834,16 @@ public class Session_1_0 implements AMQS
MessageSource source = getAddressSpace().getAttainedMessageSource(name);
return source instanceof Queue ? (Queue<?>) source : null;
}
-
+ private final class CapacityCheckAction implements Action<MessageInstance>
+ {
+ @Override
+ public void performAction(final MessageInstance entry)
+ {
+ TransactionLogResource queue = entry.getOwningResource();
+ if(queue instanceof CapacityChecker)
+ {
+ ((CapacityChecker)queue).checkCapacity(Session_1_0.this);
+ }
+ }
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Wed Dec 7 09:27:16 2016
@@ -205,7 +205,7 @@ public class StandardReceivingLink_1_0 i
.checkAuthorizedMessagePrincipal(message.getMessageHeader().getUserId());
_destination.authorizePublish(session.getSecurityToken(), message);
- Outcome outcome = _destination.send(message, transaction);
+ Outcome outcome = _destination.send(message, transaction, session.getCapacityCheckAction());
DeliveryState resultantState;
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Wed Dec 7 09:27:16 2016
@@ -44,6 +44,8 @@ import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
@@ -336,9 +338,12 @@ public class ProducerFlowControlTest ext
_consumerConnection.start();
_consumer.receive();
-
- //perform a synchronous op on the connection
- ((AMQSession<?,?>) _consumerSession).sync();
+
+ if(!isBroker10())
+ {
+ //perform a synchronous op on the connection
+ ((AMQSession<?, ?>) _consumerSession).sync();
+ }
assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
@@ -386,10 +391,18 @@ public class ProducerFlowControlTest ext
// close blocked producer session and connection
_producerConnection.close();
- // delete queue with a consumer session
- ((AMQSession<?,?>) _consumerSession).sendQueueDelete(queueName);
+ if(!isBroker10())
+ {
+ // delete queue with a consumer session
+ ((AMQSession<?, ?>) _consumerSession).sendQueueDelete(queueName);
- _consumer = _consumerSession.createConsumer(_queue);
+ _consumer = _consumerSession.createConsumer(_queue);
+ }
+ else
+ {
+ deleteEntityUsingAmqpManagement(getTestQueueName(), _consumerSession, "org.apache.qpid.Queue");
+ createTestQueue(_consumerSession);
+ }
_consumerConnection.start();
Message message = _consumer.receive(1000l);
@@ -403,12 +416,33 @@ public class ProducerFlowControlTest ext
private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception
{
- final Map<String,Object> arguments = new HashMap<String, Object>();
- arguments.put("x-qpid-capacity",capacity);
- arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
- ((AMQSession<?,?>) session).createQueue(queueName, autoDelete, durable, false, arguments);
- _queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + durable + "'&autodelete='" + autoDelete + "'");
- ((AMQSession<?,?>) session).declareAndBind((AMQDestination) _queue);
+ if(isBroker10())
+ {
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, capacity);
+ attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, resumeCapacity);
+ attributes.put(org.apache.qpid.server.model.Queue.DURABLE, durable);
+ attributes.put(ConfiguredObject.LIFETIME_POLICY, autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : LifetimePolicy.PERMANENT.name());
+ createEntityUsingAmqpManagement(getTestQueueName(), session, "org.apache.qpid.Queue", attributes);
+ _queue = session.createQueue(queueName);
+ }
+ else
+ {
+ final Map<String, Object> arguments = new HashMap<String, Object>();
+ arguments.put("x-qpid-capacity", capacity);
+ arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+ ((AMQSession<?, ?>) session).createQueue(queueName, autoDelete, durable, false, arguments);
+ _queue = session.createQueue("direct://amq.direct/"
+ + queueName
+ + "/"
+ + queueName
+ + "?durable='"
+ + durable
+ + "'&autodelete='"
+ + autoDelete
+ + "'");
+ ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue);
+ }
}
private MessageSender sendMessagesAsync(final MessageProducer producer,
@@ -433,13 +467,20 @@ public class ProducerFlowControlTest ext
try
{
- ((AMQSession<?,?>)producerSession).sync();
- // TODO: sync a second time in order to ensure that the client has received the flow command
- // before continuing with the next message. This is required because the Broker may legally
- // send the flow command after the sync response. By sync'ing a second time we ensure that
- // the client will has seen/acted on the flow command. The test really ought not have this
- // level of information.
- ((AMQSession<?,?>)producerSession).sync();
+ if(!isBroker10())
+ {
+ ((AMQSession<?,?>)producerSession).sync();
+ // TODO: sync a second time in order to ensure that the client has received the flow command
+ // before continuing with the next message. This is required because the Broker may legally
+ // send the flow command after the sync response. By sync'ing a second time we ensure that
+ // the client will has seen/acted on the flow command. The test really ought not have this
+ // level of information.
+ ((AMQSession<?,?>)producerSession).sync();
+ }
+ else
+ {
+ producerSession.createTemporaryQueue().delete();
+ }
}
catch (QpidException e)
{
Modified: qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes Wed Dec 7 09:27:16 2016
@@ -88,5 +88,5 @@ org.apache.qpid.server.queue.ConsumerPri
org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
// Broker should issue drain to client when flow control is enforced, so that existing credit is used up (test will also need updating)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
+//org.apache.qpid.server.queue.ProducerFlowControlTest#*
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org