You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2020/09/12 21:30:45 UTC
[qpid-broker-j] 12/17: QPID-8451:[Broker-J]Enforce producer flow
control on new connections
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 8.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit cf5fa8cd5606efcb38044223dc6e336e0c2539b2
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Tue Sep 8 15:17:42 2020 +0530
QPID-8451:[Broker-J]Enforce producer flow control on new connections
This closes #58
(cherry picked from commit 6e498b95738441a15eea8e207b13b61d0e3e24a7)
---
.../qpid/server/protocol/v1_0/Session_1_0.java | 30 ++++++++---
.../extensions/queue/ProducerFlowControlTest.java | 60 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 7 deletions(-)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index d003e3a..076dc43 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1358,6 +1358,28 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
_associatedLinkEndpoints.forEach(linkedEnpoint -> linkedEnpoint.receiveComplete());
}
+ private void checkMessageDestinationFlowForReceivingLinkEndpoint(final LinkEndpoint<?,?> endpoint)
+ {
+ if (endpoint instanceof StandardReceivingLinkEndpoint)
+ {
+ final ReceivingDestination destination =
+ ((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination();
+ if (_blockingEntities.contains(this)
+ || _blockingEntities.contains(destination))
+ {
+ endpoint.setStopped(true);
+ }
+ else if (destination.getMessageDestination() instanceof Queue)
+ {
+ Queue<?> queue = (Queue<?>)destination.getMessageDestination();
+ if (queue.isQueueFlowStopped())
+ {
+ queue.checkCapacity();
+ }
+ }
+ }
+ }
+
private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
{
@@ -1394,17 +1416,11 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
else
{
- if (endpoint instanceof StandardReceivingLinkEndpoint
- && (_blockingEntities.contains(Session_1_0.this)
- || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint)
- .getReceivingDestination())))
- {
- endpoint.setStopped(true);
- }
if (!_endpointToOutputHandle.containsKey(endpoint))
{
_endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
+ checkMessageDestinationFlowForReceivingLinkEndpoint(endpoint);
endpoint.sendAttach();
endpoint.start();
}
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
index 918f833..8b6e2ca 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.systests.jms_1_1.extensions.queue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
import java.util.HashMap;
import java.util.Map;
@@ -43,6 +47,8 @@ import javax.jms.Session;
import org.junit.Test;
import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.Utils;
public class ProducerFlowControlTest extends OverflowPolicyTestBase
{
@@ -303,6 +309,60 @@ public class ProducerFlowControlTest extends OverflowPolicyTestBase
}
}
+ @Test
+ public void testEnforceFlowControlOnNewConnection() throws Exception
+ {
+ assumeThat(getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
+ final Queue testQueue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.PRODUCER_FLOW_CONTROL, 0, 1, 0);
+ final Connection producerConnection1 = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ Utils.sendMessages(producerConnection1, testQueue, 2);
+ assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+ }
+ finally
+ {
+ producerConnection1.close();
+ }
+
+ final Connection producerConnection2 = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ final Session producerSession = producerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer = producerSession.createProducer(testQueue);
+ final MessageSender messageSender = sendMessagesAsync(producer, producerSession, 1);
+
+ assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+ assertEquals("Incorrect number of message sent before blocking",
+ 0,
+ messageSender.getNumberOfSentMessages());
+
+ final Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(testQueue);
+ consumerConnection.start();
+
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message);
+
+ Message message2 = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message2);
+ assertTrue("Message sending is not finished", messageSender.getSendLatch()
+ .await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+ finally
+ {
+ producerConnection2.close();
+ }
+ }
+
private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception
{
final Map<String, Object> attributes = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org