You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2020/09/12 21:30:45 UTC

[qpid-broker-j] 12/17: QPID-8451:[Broker-J]Enforce producer flow control on new connections

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit cf5fa8cd5606efcb38044223dc6e336e0c2539b2
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Tue Sep 8 15:17:42 2020 +0530

    QPID-8451:[Broker-J]Enforce producer flow control on new connections
    
    This closes #58
    
    (cherry picked from commit 6e498b95738441a15eea8e207b13b61d0e3e24a7)
---
 .../qpid/server/protocol/v1_0/Session_1_0.java     | 30 ++++++++---
 .../extensions/queue/ProducerFlowControlTest.java  | 60 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 7 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index d003e3a..076dc43 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1358,6 +1358,28 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         _associatedLinkEndpoints.forEach(linkedEnpoint -> linkedEnpoint.receiveComplete());
     }
 
+    private void checkMessageDestinationFlowForReceivingLinkEndpoint(final LinkEndpoint<?,?> endpoint)
+    {
+        if (endpoint instanceof StandardReceivingLinkEndpoint)
+        {
+            final ReceivingDestination destination =
+                    ((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination();
+            if (_blockingEntities.contains(this)
+                || _blockingEntities.contains(destination))
+            {
+                endpoint.setStopped(true);
+            }
+            else if (destination.getMessageDestination() instanceof Queue)
+            {
+                Queue<?> queue = (Queue<?>)destination.getMessageDestination();
+                if (queue.isQueueFlowStopped())
+                {
+                    queue.checkCapacity();
+                }
+            }
+        }
+    }
+
     private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
     {
 
@@ -1394,17 +1416,11 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                     }
                     else
                     {
-                        if (endpoint instanceof StandardReceivingLinkEndpoint
-                            && (_blockingEntities.contains(Session_1_0.this)
-                                || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint)
-                                                                      .getReceivingDestination())))
-                        {
-                            endpoint.setStopped(true);
-                        }
 
                         if (!_endpointToOutputHandle.containsKey(endpoint))
                         {
                             _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
+                            checkMessageDestinationFlowForReceivingLinkEndpoint(endpoint);
                             endpoint.sendAttach();
                             endpoint.start();
                         }
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
index 918f833..8b6e2ca 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -20,10 +20,14 @@
 */
 package org.apache.qpid.systests.jms_1_1.extensions.queue;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -43,6 +47,8 @@ import javax.jms.Session;
 import org.junit.Test;
 
 import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.Utils;
 
 public class ProducerFlowControlTest extends OverflowPolicyTestBase
 {
@@ -303,6 +309,60 @@ public class ProducerFlowControlTest extends OverflowPolicyTestBase
         }
     }
 
+    @Test
+    public void testEnforceFlowControlOnNewConnection() throws Exception
+    {
+        assumeThat(getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
+        final Queue testQueue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.PRODUCER_FLOW_CONTROL, 0, 1, 0);
+        final Connection producerConnection1 = getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Utils.sendMessages(producerConnection1, testQueue, 2);
+            assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+        }
+        finally
+        {
+            producerConnection1.close();
+        }
+
+        final Connection producerConnection2 = getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            final Session producerSession = producerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = producerSession.createProducer(testQueue);
+            final MessageSender messageSender = sendMessagesAsync(producer, producerSession, 1);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+            assertEquals("Incorrect number of message sent before blocking",
+                         0,
+                         messageSender.getNumberOfSentMessages());
+
+            final Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = consumerSession.createConsumer(testQueue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+
+                Message message2 = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message2);
+                assertTrue("Message sending is not finished", messageSender.getSendLatch()
+                                                                           .await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection2.close();
+        }
+    }
+
     private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception
     {
         final Map<String, Object> attributes = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org