You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2020/09/09 23:51:54 UTC
[qpid-broker-j] branch master updated (59b25c8 -> 5c2274f)
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.
from 59b25c8 QPID-8454:[Broker-J] Add missing license
new 6e498b9 QPID-8451:[Broker-J]Enforce producer flow control on new connections
new 3a74fc4 QPID-8451:[Broker-J] Block session before adding it into blocked sesssion set
new 5c2274f QPID-8448: Make sure that actual connection is closed
The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../ProducerFlowControlOverflowPolicyHandler.java | 2 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 30 ++++++++---
.../TlsOrPlainConnectionFactory.java | 31 ++++++++---
.../extensions/queue/ProducerFlowControlTest.java | 60 ++++++++++++++++++++++
4 files changed, 108 insertions(+), 15 deletions(-)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[qpid-broker-j] 03/03: QPID-8448: Make sure that actual connection
is closed
Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 5c2274f8d1d45f442b82d91ca1a247cfe9f76688
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Wed Sep 9 00:49:56 2020 +0100
QPID-8448: Make sure that actual connection is closed
This closes #59
---
.../TlsOrPlainConnectionFactory.java | 31 +++++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java
index 5c094fb..c3d0ac7 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java
@@ -127,13 +127,11 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
@Override
public void addListener(Listener listener)
{
- if (_actualConnection == null)
+ _listeners.add(listener);
+ AbstractConnection actualConnection = _actualConnection;
+ if (actualConnection != null)
{
- _listeners.add(listener);
- }
- else
- {
- _actualConnection.addListener(listener);
+ actualConnection.addListener(listener);
}
}
@@ -141,6 +139,11 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
public void removeListener(Listener listener)
{
_listeners.remove(listener);
+ AbstractConnection actualConnection = _actualConnection;
+ if (actualConnection != null)
+ {
+ actualConnection.removeListener(listener);
+ }
}
@Override
@@ -157,6 +160,11 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
listener.onOpened(this);
}
+ final AbstractConnection actualConnection = _actualConnection;
+ if (actualConnection != null)
+ {
+ actualConnection.onOpen();
+ }
}
@Override
@@ -167,6 +175,12 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
LOG.debug("onClose {}", this);
}
+ final AbstractConnection actualConnection = _actualConnection;
+ if (actualConnection != null)
+ {
+ actualConnection.onClose();
+ }
+
for (Listener listener : _listeners)
{
listener.onClosed(this);
@@ -350,7 +364,10 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
private SslConnection newSslConnection(final Connector connector, final EndPoint endPoint, final SSLEngine engine)
{
- return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine);
+ final SslConnection sslConnection =
+ new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine);
+ TlsOrPlainConnectionFactory.this.configure(sslConnection, _connector, _endPoint);
+ return sslConnection;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[qpid-broker-j] 01/03: QPID-8451:[Broker-J]Enforce producer flow
control on new connections
Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 6e498b95738441a15eea8e207b13b61d0e3e24a7
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Tue Sep 8 15:17:42 2020 +0530
QPID-8451:[Broker-J]Enforce producer flow control on new connections
This closes #58
---
.../qpid/server/protocol/v1_0/Session_1_0.java | 30 ++++++++---
.../extensions/queue/ProducerFlowControlTest.java | 60 ++++++++++++++++++++++
2 files changed, 83 insertions(+), 7 deletions(-)
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index d003e3a..076dc43 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1358,6 +1358,28 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
_associatedLinkEndpoints.forEach(linkedEnpoint -> linkedEnpoint.receiveComplete());
}
+ private void checkMessageDestinationFlowForReceivingLinkEndpoint(final LinkEndpoint<?,?> endpoint)
+ {
+ if (endpoint instanceof StandardReceivingLinkEndpoint)
+ {
+ final ReceivingDestination destination =
+ ((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination();
+ if (_blockingEntities.contains(this)
+ || _blockingEntities.contains(destination))
+ {
+ endpoint.setStopped(true);
+ }
+ else if (destination.getMessageDestination() instanceof Queue)
+ {
+ Queue<?> queue = (Queue<?>)destination.getMessageDestination();
+ if (queue.isQueueFlowStopped())
+ {
+ queue.checkCapacity();
+ }
+ }
+ }
+ }
+
private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
{
@@ -1394,17 +1416,11 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
else
{
- if (endpoint instanceof StandardReceivingLinkEndpoint
- && (_blockingEntities.contains(Session_1_0.this)
- || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint)
- .getReceivingDestination())))
- {
- endpoint.setStopped(true);
- }
if (!_endpointToOutputHandle.containsKey(endpoint))
{
_endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
+ checkMessageDestinationFlowForReceivingLinkEndpoint(endpoint);
endpoint.sendAttach();
endpoint.start();
}
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
index 918f833..8b6e2ca 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -20,10 +20,14 @@
*/
package org.apache.qpid.systests.jms_1_1.extensions.queue;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
import java.util.HashMap;
import java.util.Map;
@@ -43,6 +47,8 @@ import javax.jms.Session;
import org.junit.Test;
import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.Utils;
public class ProducerFlowControlTest extends OverflowPolicyTestBase
{
@@ -303,6 +309,60 @@ public class ProducerFlowControlTest extends OverflowPolicyTestBase
}
}
+ @Test
+ public void testEnforceFlowControlOnNewConnection() throws Exception
+ {
+ assumeThat(getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
+ final Queue testQueue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.PRODUCER_FLOW_CONTROL, 0, 1, 0);
+ final Connection producerConnection1 = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ Utils.sendMessages(producerConnection1, testQueue, 2);
+ assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+ }
+ finally
+ {
+ producerConnection1.close();
+ }
+
+ final Connection producerConnection2 = getConnectionBuilder().setSyncPublish(true).build();
+ try
+ {
+ final Session producerSession = producerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final MessageProducer producer = producerSession.createProducer(testQueue);
+ final MessageSender messageSender = sendMessagesAsync(producer, producerSession, 1);
+
+ assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+ assertEquals("Incorrect number of message sent before blocking",
+ 0,
+ messageSender.getNumberOfSentMessages());
+
+ final Connection consumerConnection = getConnection();
+ try
+ {
+ Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = consumerSession.createConsumer(testQueue);
+ consumerConnection.start();
+
+ Message message = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message);
+
+ Message message2 = consumer.receive(getReceiveTimeout());
+ assertNotNull("Message is not received", message2);
+ assertTrue("Message sending is not finished", messageSender.getSendLatch()
+ .await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+ }
+ finally
+ {
+ consumerConnection.close();
+ }
+ }
+ finally
+ {
+ producerConnection2.close();
+ }
+ }
+
private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception
{
final Map<String, Object> attributes = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[qpid-broker-j] 02/03: QPID-8451:[Broker-J] Block session before
adding it into blocked sesssion set
Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 3a74fc4976c3e49c4841c8dce76924b59033e294
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Sep 10 00:00:27 2020 +0100
QPID-8451:[Broker-J] Block session before adding it into blocked sesssion set
This should eliminate the possibility of leaving session in blocked state
when a consumer thread unblock the blocked sessions and remove them from the blocked session set
---
.../qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
index b2559b5..51491e1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
@@ -189,8 +189,8 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH
}
final AMQPSession<?, ?> session = sessionPrincipal.getSession();
- _blockedSessions.add(session);
session.block(_queue);
+ _blockedSessions.add(session);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org