You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/12/07 09:27:17 UTC

svn commit: r1773033 - in /qpid/java/trunk: broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ systests/src/test/java/org/apache/qpid/server/queue/ test-profiles/

Author: rgodfrey
Date: Wed Dec  7 09:27:16 2016
New Revision: 1773033

URL: http://svn.apache.org/viewvc?rev=1773033&view=rev
Log:
QPID-7529 : add actions to ensure producer flow control is activated when queues become oversized

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
    qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Wed Dec  7 09:27:16 2016
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -39,6 +40,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 public class ExchangeDestination implements ReceivingDestination, SendingDestination
 {
@@ -74,7 +76,9 @@ public class ExchangeDestination impleme
         return OUTCOMES;
     }
 
-    public Outcome send(final Message_1_0 message, ServerTransaction txn)
+    public Outcome send(final Message_1_0 message,
+                        ServerTransaction txn,
+                        final Action<MessageInstance> action)
     {
         final InstanceProperties instanceProperties =
             new InstanceProperties()
@@ -104,7 +108,7 @@ public class ExchangeDestination impleme
                                       routingAddress,
                                       instanceProperties,
                                       txn,
-                                      null);
+                                      action);
 
         if(enqueues == 0)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Wed Dec  7 09:27:16 2016
@@ -438,15 +438,16 @@ public abstract class LinkEndpoint<T ext
             if(_stopped)
             {
                 flow.setLinkCredit(UnsignedInteger.ZERO);
+                flow.setDrain(true);
                 _lastSentCreditLimit = _deliveryCount;
             }
             else
             {
                 flow.setLinkCredit(_linkCredit);
                 _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+                flow.setDrain(_drain);
             }
             flow.setAvailable(_available);
-            flow.setDrain(_drain);
             if(setTransactionId)
             {
                 flow.setProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), _flowTransactionId));

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Wed Dec  7 09:27:16 2016
@@ -27,6 +27,7 @@ import org.apache.qpid.server.logging.Ev
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -38,6 +39,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 public class NodeReceivingDestination implements ReceivingDestination
 {
@@ -74,7 +76,9 @@ public class NodeReceivingDestination im
         return OUTCOMES;
     }
 
-    public Outcome send(final Message_1_0 message, ServerTransaction txn)
+    public Outcome send(final Message_1_0 message,
+                        ServerTransaction txn,
+                        final Action<MessageInstance> action)
     {
         final InstanceProperties instanceProperties =
             new InstanceProperties()
@@ -102,7 +106,7 @@ public class NodeReceivingDestination im
         String routingAddress;
         routingAddress = getRoutingAddress(message);
 
-        int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, null);
+        int enqueues = _destination.send(message, routingAddress, instanceProperties, txn, action);
 
         if(enqueues == 0)
         {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Wed Dec  7 09:27:16 2016
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -33,6 +34,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 public class QueueDestination extends MessageSourceDestination implements SendingDestination, ReceivingDestination
 {
@@ -54,7 +56,9 @@ public class QueueDestination extends Me
         return OUTCOMES;
     }
 
-    public Outcome send(final Message_1_0 message, ServerTransaction txn)
+    public Outcome send(final Message_1_0 message,
+                        ServerTransaction txn,
+                        final Action<MessageInstance> action)
     {
 
         txn.enqueue(getQueue(),message, new ServerTransaction.EnqueueAction()
@@ -66,7 +70,7 @@ public class QueueDestination extends Me
             {
                 try
                 {
-                    getQueue().enqueue(message, null, records[0]);
+                    getQueue().enqueue(message, action, records[0]);
                 }
                 finally
                 {

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Wed Dec  7 09:27:16 2016
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.security.SecurityToken;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 
 public interface ReceivingDestination extends Destination
 {
@@ -36,7 +38,7 @@ public interface ReceivingDestination ex
 
     Outcome[] getOutcomes();
 
-    Outcome send(Message_1_0 message, ServerTransaction txn);
+    Outcome send(Message_1_0 message, ServerTransaction txn, final Action<MessageInstance> postEnqueueAction);
 
     int getCredit();
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Wed Dec  7 09:27:16 2016
@@ -158,6 +158,10 @@ public class ReceivingLinkEndpoint exten
         _remoteDrain = Boolean.TRUE.equals((Boolean) flow.getDrain());
         setAvailable(flow.getAvailable());
         setDeliveryCount(flow.getDeliveryCount());
+        if(isDrained())
+        {
+
+        }
     }
 
 

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Wed Dec  7 09:27:16 2016
@@ -56,6 +56,7 @@ import org.apache.qpid.server.logging.Lo
 import org.apache.qpid.server.logging.messages.ChannelMessages;
 import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
 import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageInstanceConsumer;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
@@ -68,6 +69,7 @@ import org.apache.qpid.server.model.Name
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.protocol.ConsumerListener;
 import org.apache.qpid.server.protocol.LinkRegistry;
 import org.apache.qpid.server.protocol.v1_0.codec.QpidByteBufferUtils;
@@ -102,6 +104,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.security.SecurityToken;
+import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
@@ -150,6 +153,8 @@ public class Session_1_0 implements AMQS
     private short _receivingChannel;
     private short _sendingChannel = -1;
 
+    private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
+
 
     // has to be a power of two
     private static final int DEFAULT_SESSION_BUFFER_SIZE = 1 << 11;
@@ -1336,7 +1341,8 @@ public class Session_1_0 implements AMQS
             for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
             {
                 StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
-                if (queue == link.getDestination())
+
+                if (isQueueDestinationForLink(queue, link.getDestination()))
                 {
                     endpoint.setStopped(true);
                 }
@@ -1345,6 +1351,13 @@ public class Session_1_0 implements AMQS
         }
     }
 
+    private boolean isQueueDestinationForLink(final Queue<?> queue, final ReceivingDestination recvDest)
+    {
+        return (recvDest instanceof NodeReceivingDestination && queue == ((NodeReceivingDestination) recvDest).getDestination())
+                || recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
+
+    }
+
     @Override
     public void unblock(final Queue<?> queue)
     {
@@ -1370,7 +1383,7 @@ public class Session_1_0 implements AMQS
             for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
             {
                 StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
-                if (queue == link.getDestination())
+                if (isQueueDestinationForLink(queue, link.getDestination()))
                 {
                     endpoint.setStopped(false);
                 }
@@ -1698,6 +1711,11 @@ public class Session_1_0 implements AMQS
         _unacknowledgedMessages--;
     }
 
+    public CapacityCheckAction getCapacityCheckAction()
+    {
+        return _capacityCheckAction;
+    }
+
     private class ConsumerClosedListener extends AbstractConfigurationChangeListener
     {
         @Override
@@ -1816,5 +1834,16 @@ public class Session_1_0 implements AMQS
         MessageSource source = getAddressSpace().getAttainedMessageSource(name);
         return source instanceof Queue ? (Queue<?>) source : null;
     }
-
+    private final class CapacityCheckAction implements Action<MessageInstance>
+    {
+        @Override
+        public void performAction(final MessageInstance entry)
+        {
+            TransactionLogResource queue = entry.getOwningResource();
+            if(queue instanceof CapacityChecker)
+            {
+                ((CapacityChecker)queue).checkCapacity(Session_1_0.this);
+            }
+        }
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java Wed Dec  7 09:27:16 2016
@@ -205,7 +205,7 @@ public class StandardReceivingLink_1_0 i
                             .checkAuthorizedMessagePrincipal(message.getMessageHeader().getUserId());
                     _destination.authorizePublish(session.getSecurityToken(), message);
 
-                    Outcome outcome = _destination.send(message, transaction);
+                    Outcome outcome = _destination.send(message, transaction, session.getCapacityCheckAction());
 
                     DeliveryState resultantState;
 

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java Wed Dec  7 09:27:16 2016
@@ -44,6 +44,8 @@ import org.apache.qpid.QpidException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.server.logging.AbstractTestLogging;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.systest.rest.RestTestHelper;
 import org.apache.qpid.test.utils.TestBrokerConfiguration;
 
@@ -336,9 +338,12 @@ public class ProducerFlowControlTest ext
         _consumerConnection.start();
         
         _consumer.receive();
-        
-        //perform a synchronous op on the connection
-        ((AMQSession<?,?>) _consumerSession).sync();
+
+        if(!isBroker10())
+        {
+            //perform a synchronous op on the connection
+            ((AMQSession<?, ?>) _consumerSession).sync();
+        }
 
         assertFalse("Queue should not be overfull", isFlowStopped(queueUrl));
 
@@ -386,10 +391,18 @@ public class ProducerFlowControlTest ext
         // close blocked producer session and connection
         _producerConnection.close();
 
-        // delete queue with a consumer session
-        ((AMQSession<?,?>) _consumerSession).sendQueueDelete(queueName);
+        if(!isBroker10())
+        {
+            // delete queue with a consumer session
+            ((AMQSession<?, ?>) _consumerSession).sendQueueDelete(queueName);
 
-        _consumer = _consumerSession.createConsumer(_queue);
+            _consumer = _consumerSession.createConsumer(_queue);
+        }
+        else
+        {
+            deleteEntityUsingAmqpManagement(getTestQueueName(), _consumerSession, "org.apache.qpid.Queue");
+            createTestQueue(_consumerSession);
+        }
         _consumerConnection.start();
 
         Message message = _consumer.receive(1000l);
@@ -403,12 +416,33 @@ public class ProducerFlowControlTest ext
 
     private void createAndBindQueueWithFlowControlEnabled(Session session, String queueName, int capacity, int resumeCapacity, boolean durable, boolean autoDelete) throws Exception
     {
-        final Map<String,Object> arguments = new HashMap<String, Object>();
-        arguments.put("x-qpid-capacity",capacity);
-        arguments.put("x-qpid-flow-resume-capacity",resumeCapacity);
-        ((AMQSession<?,?>) session).createQueue(queueName, autoDelete, durable, false, arguments);
-        _queue = session.createQueue("direct://amq.direct/" + queueName + "/" + queueName + "?durable='" + durable + "'&autodelete='" + autoDelete + "'");
-        ((AMQSession<?,?>) session).declareAndBind((AMQDestination) _queue);
+        if(isBroker10())
+        {
+            final Map<String, Object> attributes = new HashMap<>();
+            attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES, capacity);
+            attributes.put(org.apache.qpid.server.model.Queue.QUEUE_FLOW_RESUME_SIZE_BYTES, resumeCapacity);
+            attributes.put(org.apache.qpid.server.model.Queue.DURABLE, durable);
+            attributes.put(ConfiguredObject.LIFETIME_POLICY, autoDelete ? LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS.name() : LifetimePolicy.PERMANENT.name());
+            createEntityUsingAmqpManagement(getTestQueueName(), session, "org.apache.qpid.Queue", attributes);
+            _queue = session.createQueue(queueName);
+        }
+        else
+        {
+            final Map<String, Object> arguments = new HashMap<String, Object>();
+            arguments.put("x-qpid-capacity", capacity);
+            arguments.put("x-qpid-flow-resume-capacity", resumeCapacity);
+            ((AMQSession<?, ?>) session).createQueue(queueName, autoDelete, durable, false, arguments);
+            _queue = session.createQueue("direct://amq.direct/"
+                                         + queueName
+                                         + "/"
+                                         + queueName
+                                         + "?durable='"
+                                         + durable
+                                         + "'&autodelete='"
+                                         + autoDelete
+                                         + "'");
+            ((AMQSession<?, ?>) session).declareAndBind((AMQDestination) _queue);
+        }
     }
 
     private MessageSender sendMessagesAsync(final MessageProducer producer,
@@ -433,13 +467,20 @@ public class ProducerFlowControlTest ext
 
             try
             {
-                ((AMQSession<?,?>)producerSession).sync();
-                // TODO: sync a second time in order to ensure that the client has received the flow command
-                // before continuing with the next message.  This is required because the Broker may legally
-                // send the flow command after the sync response. By sync'ing a second time we ensure that
-                // the client will has seen/acted on the flow command.  The test really ought not have this
-                // level of information.
-                ((AMQSession<?,?>)producerSession).sync();
+                if(!isBroker10())
+                {
+                    ((AMQSession<?,?>)producerSession).sync();
+                    // TODO: sync a second time in order to ensure that the client has received the flow command
+                    // before continuing with the next message.  This is required because the Broker may legally
+                    // send the flow command after the sync response. By sync'ing a second time we ensure that
+                    // the client will has seen/acted on the flow command.  The test really ought not have this
+                    // level of information.
+                    ((AMQSession<?,?>)producerSession).sync();
+                }
+                else
+                {
+                    producerSession.createTemporaryQueue().delete();
+                }
             }
             catch (QpidException e)
             {

Modified: qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes?rev=1773033&r1=1773032&r2=1773033&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes (original)
+++ qpid/java/trunk/test-profiles/Java10BrokenTestsExcludes Wed Dec  7 09:27:16 2016
@@ -88,5 +88,5 @@ org.apache.qpid.server.queue.ConsumerPri
 org.apache.qpid.server.queue.ArrivalTimeFilterTest#*
 
 // Broker should issue drain to client when flow control is enforced, so that existing credit is used up (test will also need updating)
-org.apache.qpid.server.queue.ProducerFlowControlTest#*
+//org.apache.qpid.server.queue.ProducerFlowControlTest#*
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org