You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2020/09/09 23:51:54 UTC

[qpid-broker-j] branch master updated (59b25c8 -> 5c2274f)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 59b25c8  QPID-8454:[Broker-J] Add missing license
     new 6e498b9  QPID-8451:[Broker-J]Enforce producer flow control on new connections
     new 3a74fc4  QPID-8451:[Broker-J] Block session before adding it into blocked sesssion set
     new 5c2274f  QPID-8448: Make sure that actual connection is closed

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ProducerFlowControlOverflowPolicyHandler.java  |  2 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java     | 30 ++++++++---
 .../TlsOrPlainConnectionFactory.java               | 31 ++++++++---
 .../extensions/queue/ProducerFlowControlTest.java  | 60 ++++++++++++++++++++++
 4 files changed, 108 insertions(+), 15 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 03/03: QPID-8448: Make sure that actual connection is closed

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 5c2274f8d1d45f442b82d91ca1a247cfe9f76688
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Wed Sep 9 00:49:56 2020 +0100

    QPID-8448: Make sure that actual connection is closed
    
    This closes #59
---
 .../TlsOrPlainConnectionFactory.java               | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java
index 5c094fb..c3d0ac7 100644
--- a/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java
+++ b/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/portunification/TlsOrPlainConnectionFactory.java
@@ -127,13 +127,11 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
         @Override
         public void addListener(Listener listener)
         {
-            if (_actualConnection == null)
+            _listeners.add(listener);
+            AbstractConnection actualConnection = _actualConnection;
+            if (actualConnection != null)
             {
-                _listeners.add(listener);
-            }
-            else
-            {
-                _actualConnection.addListener(listener);
+                actualConnection.addListener(listener);
             }
         }
 
@@ -141,6 +139,11 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
         public void removeListener(Listener listener)
         {
             _listeners.remove(listener);
+            AbstractConnection actualConnection = _actualConnection;
+            if (actualConnection != null)
+            {
+                actualConnection.removeListener(listener);
+            }
         }
 
         @Override
@@ -157,6 +160,11 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
                 listener.onOpened(this);
             }
 
+            final AbstractConnection actualConnection = _actualConnection;
+            if (actualConnection != null)
+            {
+                actualConnection.onOpen();
+            }
         }
 
         @Override
@@ -167,6 +175,12 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
                 LOG.debug("onClose {}", this);
             }
 
+            final AbstractConnection actualConnection = _actualConnection;
+            if (actualConnection != null)
+            {
+                actualConnection.onClose();
+            }
+
             for (Listener listener : _listeners)
             {
                 listener.onClosed(this);
@@ -350,7 +364,10 @@ public class TlsOrPlainConnectionFactory extends AbstractConnectionFactory
 
         private SslConnection newSslConnection(final Connector connector, final EndPoint endPoint, final SSLEngine engine)
         {
-            return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine);
+            final SslConnection sslConnection =
+                    new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine);
+            TlsOrPlainConnectionFactory.this.configure(sslConnection, _connector, _endPoint);
+            return sslConnection;
         }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/03: QPID-8451:[Broker-J]Enforce producer flow control on new connections

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 6e498b95738441a15eea8e207b13b61d0e3e24a7
Author: Dedeepya T <de...@yahoo.co.in>
AuthorDate: Tue Sep 8 15:17:42 2020 +0530

    QPID-8451:[Broker-J]Enforce producer flow control on new connections
    
    This closes #58
---
 .../qpid/server/protocol/v1_0/Session_1_0.java     | 30 ++++++++---
 .../extensions/queue/ProducerFlowControlTest.java  | 60 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 7 deletions(-)

diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index d003e3a..076dc43 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -1358,6 +1358,28 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         _associatedLinkEndpoints.forEach(linkedEnpoint -> linkedEnpoint.receiveComplete());
     }
 
+    private void checkMessageDestinationFlowForReceivingLinkEndpoint(final LinkEndpoint<?,?> endpoint)
+    {
+        if (endpoint instanceof StandardReceivingLinkEndpoint)
+        {
+            final ReceivingDestination destination =
+                    ((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination();
+            if (_blockingEntities.contains(this)
+                || _blockingEntities.contains(destination))
+            {
+                endpoint.setStopped(true);
+            }
+            else if (destination.getMessageDestination() instanceof Queue)
+            {
+                Queue<?> queue = (Queue<?>)destination.getMessageDestination();
+                if (queue.isQueueFlowStopped())
+                {
+                    queue.checkCapacity();
+                }
+            }
+        }
+    }
+
     private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
     {
 
@@ -1394,17 +1416,11 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                     }
                     else
                     {
-                        if (endpoint instanceof StandardReceivingLinkEndpoint
-                            && (_blockingEntities.contains(Session_1_0.this)
-                                || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint)
-                                                                      .getReceivingDestination())))
-                        {
-                            endpoint.setStopped(true);
-                        }
 
                         if (!_endpointToOutputHandle.containsKey(endpoint))
                         {
                             _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle());
+                            checkMessageDestinationFlowForReceivingLinkEndpoint(endpoint);
                             endpoint.sendAttach();
                             endpoint.start();
                         }
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
index 918f833..8b6e2ca 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/queue/ProducerFlowControlTest.java
@@ -20,10 +20,14 @@
 */
 package org.apache.qpid.systests.jms_1_1.extensions.queue;
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeThat;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -43,6 +47,8 @@ import javax.jms.Session;
 import org.junit.Test;
 
 import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.Utils;
 
 public class ProducerFlowControlTest extends OverflowPolicyTestBase
 {
@@ -303,6 +309,60 @@ public class ProducerFlowControlTest extends OverflowPolicyTestBase
         }
     }
 
+    @Test
+    public void testEnforceFlowControlOnNewConnection() throws Exception
+    {
+        assumeThat(getProtocol(), is(equalTo(Protocol.AMQP_1_0)));
+        final Queue testQueue = createQueueWithOverflowPolicy(getTestName(), OverflowPolicy.PRODUCER_FLOW_CONTROL, 0, 1, 0);
+        final Connection producerConnection1 = getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            Utils.sendMessages(producerConnection1, testQueue, 2);
+            assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+        }
+        finally
+        {
+            producerConnection1.close();
+        }
+
+        final Connection producerConnection2 = getConnectionBuilder().setSyncPublish(true).build();
+        try
+        {
+            final Session producerSession = producerConnection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final MessageProducer producer = producerSession.createProducer(testQueue);
+            final MessageSender messageSender = sendMessagesAsync(producer, producerSession, 1);
+
+            assertTrue("Flow is not stopped", awaitAttributeValue(testQueue.getQueueName(), "queueFlowStopped", true, 5000));
+            assertEquals("Incorrect number of message sent before blocking",
+                         0,
+                         messageSender.getNumberOfSentMessages());
+
+            final Connection consumerConnection = getConnection();
+            try
+            {
+                Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageConsumer consumer = consumerSession.createConsumer(testQueue);
+                consumerConnection.start();
+
+                Message message = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message);
+
+                Message message2 = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message is not received", message2);
+                assertTrue("Message sending is not finished", messageSender.getSendLatch()
+                                                                           .await(getReceiveTimeout(), TimeUnit.MILLISECONDS));
+            }
+            finally
+            {
+                consumerConnection.close();
+            }
+        }
+        finally
+        {
+            producerConnection2.close();
+        }
+    }
+
     private void setFlowLimits(final String queueName, final int blockValue, final int resumeValue) throws Exception
     {
         final Map<String, Object> attributes = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/03: QPID-8451:[Broker-J] Block session before adding it into blocked sesssion set

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 3a74fc4976c3e49c4841c8dce76924b59033e294
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Sep 10 00:00:27 2020 +0100

    QPID-8451:[Broker-J] Block session before adding it into blocked sesssion set
    
    This should eliminate the possibility of leaving session in blocked state
    when a consumer thread unblock the blocked sessions and remove them from the blocked session set
---
 .../qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
index b2559b5..51491e1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
@@ -189,8 +189,8 @@ public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyH
                         }
 
                         final AMQPSession<?, ?> session = sessionPrincipal.getSession();
-                        _blockedSessions.add(session);
                         session.block(_queue);
+                        _blockedSessions.add(session);
                     }
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org