You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Stirling Chow (JIRA)" <ji...@apache.org> on 2011/05/19 19:24:47 UTC

[jira] [Created] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
-----------------------------------------------------------------------------------------------------------------------------

                 Key: AMQ-3331
                 URL: https://issues.apache.org/jira/browse/AMQ-3331
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker, Test Cases, Transport
    Affects Versions: 5.5.0
            Reporter: Stirling Chow


Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
  1) Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
  2) Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.

This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
  1) Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
  2) Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.

This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
  1) Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
  2) Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.

This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====


> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
>   1) Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
>   2) Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Resolved] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Gary Tully (Resolved) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gary Tully resolved AMQ-3331.
-----------------------------

    Resolution: Fixed

Variant of patch applied with thanks in: http://svn.apache.org/viewvc?rev=1186813&view=rev

Added boolean attribute, {{alwaysSyncSend}} to the network connector. This allows the behavior for persistent messages to be applied to non-persistent messages.
It does not make sense to have an 'always Async' mode as this could lead to lost persistent messages. As a result then need for the enum goes away.

Also, I agree with the assessment of the use of the responseRequired flag in the choice of sync/async send, 
it should be based on the persistence attribute of the message, I have fixed that.
This is vital such that messages sent in transactions are not sent async.

Thanks for the great test case, made life much easier.
                
> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>            Assignee: Gary Tully
>             Fix For: 5.6.0
>
>         Attachments: AMQ-3331.patch, NetworkBridgeProducerFlowControlPrePatchTest.java, NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.



  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has {tt}responseRequired=false{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.




> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Attachment: AMQ-3331.patch

Patch that adds remoteDispatchType to NetworkBridgeConfiguration.

> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>         Attachments: AMQ-3331.patch, NetworkBridgeProducerFlowControlPrePatchTest.java, NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Assigned] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Gary Tully (Assigned) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gary Tully reassigned AMQ-3331:
-------------------------------

    Assignee: Gary Tully
    
> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>            Assignee: Gary Tully
>             Fix For: 5.6.0
>
>         Attachments: AMQ-3331.patch, NetworkBridgeProducerFlowControlPrePatchTest.java, NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:label=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:label=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
} else {
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}



  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code}
                // We can avoid blocking due to low usage if the producer is
                // sending
                // a sync message or if it is using a producer window
                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code}
                } else {

                    if (memoryUsage.isFull()) {
                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
                    }
{code}


{code}



> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:label=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:label=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
> } else {
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
if (!message.isResponseRequired()) {
    
    // If the message was originally sent using async
    // send, we will preserve that QOS
    // by bridging it using an async send (small chance
    // of message loss).
    try {
        remoteBroker.oneway(message);
        localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
        dequeueCounter.incrementAndGet();
    } finally {
        sub.decrementOutstandingResponses();
    }
    
} else {
    
    // The message was not sent using async send, so we
    // should only ack the local
    // broker when we get confirmation that the remote
    // broker has received the message.
    ResponseCallback callback = new ResponseCallback() {
        public void onCompletion(FutureResponse future) {
            try {
                Response response = future.getResult();
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse) response;
                    serviceLocalException(er.getException());
                } else {
                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                    dequeueCounter.incrementAndGet();
                }   
            } catch (IOException e) {
                serviceLocalException(e);
            } finally {
                sub.decrementOutstandingResponses();
            }
        }
    };
    
    remoteBroker.asyncRequest(message, callback);
}
{code}




  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
                        
                        if (!message.isResponseRequired()) {
                            
                            // If the message was originally sent using async
                            // send, we will preserve that QOS
                            // by bridging it using an async send (small chance
                            // of message loss).
                            try {
                                remoteBroker.oneway(message);
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                dequeueCounter.incrementAndGet();
                            } finally {
                                sub.decrementOutstandingResponses();
                            }
                            
                        } else {
                            
                            // The message was not sent using async send, so we
                            // should only ack the local
                            // broker when we get confirmation that the remote
                            // broker has received the message.
                            ResponseCallback callback = new ResponseCallback() {
                                public void onCompletion(FutureResponse future) {
                                    try {
                                        Response response = future.getResult();
                                        if (response.isException()) {
                                            ExceptionResponse er = (ExceptionResponse) response;
                                            serviceLocalException(er.getException());
                                        } else {
                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                            dequeueCounter.incrementAndGet();
                                        }   
                                    } catch (IOException e) {
                                        serviceLocalException(e);
                                    } finally {
                                        sub.decrementOutstandingResponses();
                                    }
                                }
                            };
                            
                            remoteBroker.asyncRequest(message, callback);
                            
                        }
{code}





> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code}
                // We can avoid blocking due to low usage if the producer is
                // sending
                // a sync message or if it is using a producer window
                if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code}
                } else {

                    if (memoryUsage.isFull()) {
                        waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
                    }
{code}


{code}


  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====


> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code}
>                 // We can avoid blocking due to low usage if the producer is
>                 // sending
>                 // a sync message or if it is using a producer window
>                 if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code}
>                 } else {
>                     if (memoryUsage.isFull()) {
>                         waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>                     }
> {code}
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has {tt}responseRequired=true{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has {tt}responseRequired=false{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.



  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}




> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has {tt}responseRequired=true{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has {tt}responseRequired=false{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.

This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====


> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
if (!message.isResponseRequired()) {
    
    // If the message was originally sent using async
    // send, we will preserve that QOS
    // by bridging it using an async send (small chance
    // of message loss).
    try {
        remoteBroker.oneway(message);
        localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
        dequeueCounter.incrementAndGet();
    } finally {
        sub.decrementOutstandingResponses();
    }
    
} else {
    
    // The message was not sent using async send, so we
    // should only ack the local
    // broker when we get confirmation that the remote
    // broker has received the message.
    ResponseCallback callback = new ResponseCallback() {
        public void onCompletion(FutureResponse future) {
            try {
                Response response = future.getResult();
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse) response;
                    serviceLocalException(er.getException());
                } else {
                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                    dequeueCounter.incrementAndGet();
                }   
            } catch (IOException e) {
                serviceLocalException(e);
            } finally {
                sub.decrementOutstandingResponses();
            }
        }
    };
    
    remoteBroker.asyncRequest(message, callback);
}
{code}

The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}




  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
if (!message.isResponseRequired()) {
    
    // If the message was originally sent using async
    // send, we will preserve that QOS
    // by bridging it using an async send (small chance
    // of message loss).
    try {
        remoteBroker.oneway(message);
        localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
        dequeueCounter.incrementAndGet();
    } finally {
        sub.decrementOutstandingResponses();
    }
    
} else {
    
    // The message was not sent using async send, so we
    // should only ack the local
    // broker when we get confirmation that the remote
    // broker has received the message.
    ResponseCallback callback = new ResponseCallback() {
        public void onCompletion(FutureResponse future) {
            try {
                Response response = future.getResult();
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse) response;
                    serviceLocalException(er.getException());
                } else {
                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                    dequeueCounter.incrementAndGet();
                }   
            } catch (IOException e) {
                serviceLocalException(e);
            } finally {
                sub.decrementOutstandingResponses();
            }
        }
    };
    
    remoteBroker.asyncRequest(message, callback);
}
{code}





> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Timothy Bish (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Bish updated AMQ-3331:
------------------------------

    Fix Version/s: 5.6.0
    
> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>             Fix For: 5.6.0
>
>         Attachments: AMQ-3331.patch, NetworkBridgeProducerFlowControlPrePatchTest.java, NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Attachment: NetworkBridgeProducerFlowControlPrePatchTest.java

Unit test that works with AMQ 5.5.0 to demonstrate how the network bridge is blocked.  The unit test passes when persistent messages are sent and fails with non-persistent messages.

> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>         Attachments: NetworkBridgeProducerFlowControlPrePatchTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
if (!message.isResponseRequired()) {
    
    // If the message was originally sent using async
    // send, we will preserve that QOS
    // by bridging it using an async send (small chance
    // of message loss).
    try {
        remoteBroker.oneway(message);
        localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
        dequeueCounter.incrementAndGet();
    } finally {
        sub.decrementOutstandingResponses();
    }
    
} else {
    
    // The message was not sent using async send, so we
    // should only ack the local
    // broker when we get confirmation that the remote
    // broker has received the message.
    ResponseCallback callback = new ResponseCallback() {
        public void onCompletion(FutureResponse future) {
            try {
                Response response = future.getResult();
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse) response;
                    serviceLocalException(er.getException());
                } else {
                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                    dequeueCounter.incrementAndGet();
                }   
            } catch (IOException e) {
                serviceLocalException(e);
            } finally {
                sub.decrementOutstandingResponses();
            }
        }
    };
    
    remoteBroker.asyncRequest(message, callback);
}
{code}

The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}

Solution
========
It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.

Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}

Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.

{{remoteDispatchType}} can have one of three values:
# {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
# {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
# {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages

A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
if (!message.isResponseRequired()) {
    
    // If the message was originally sent using async
    // send, we will preserve that QOS
    // by bridging it using an async send (small chance
    // of message loss).
    try {
        remoteBroker.oneway(message);
        localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
        dequeueCounter.incrementAndGet();
    } finally {
        sub.decrementOutstandingResponses();
    }
    
} else {
    
    // The message was not sent using async send, so we
    // should only ack the local
    // broker when we get confirmation that the remote
    // broker has received the message.
    ResponseCallback callback = new ResponseCallback() {
        public void onCompletion(FutureResponse future) {
            try {
                Response response = future.getResult();
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse) response;
                    serviceLocalException(er.getException());
                } else {
                    localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                    dequeueCounter.incrementAndGet();
                }   
            } catch (IOException e) {
                serviceLocalException(e);
            } finally {
                sub.decrementOutstandingResponses();
            }
        }
    };
    
    remoteBroker.asyncRequest(message, callback);
}
{code}

The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}





> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}



  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:label=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:label=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
} else {
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}




> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=public void send(final ProducerBrokerExchange producerExchange, final Message message)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.

This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
  1) Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
  2) Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.

This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====


> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Attachment: NetworkBridgeProducerFlowControlTest.java

Unit test that verifies the patch.  

> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>         Attachments: AMQ-3331.patch, NetworkBridgeProducerFlowControlPrePatchTest.java, NetworkBridgeProducerFlowControlTest.java
>
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
> if (!message.isResponseRequired()) {
>     
>     // If the message was originally sent using async
>     // send, we will preserve that QOS
>     // by bridging it using an async send (small chance
>     // of message loss).
>     try {
>         remoteBroker.oneway(message);
>         localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>         dequeueCounter.incrementAndGet();
>     } finally {
>         sub.decrementOutstandingResponses();
>     }
>     
> } else {
>     
>     // The message was not sent using async send, so we
>     // should only ack the local
>     // broker when we get confirmation that the remote
>     // broker has received the message.
>     ResponseCallback callback = new ResponseCallback() {
>         public void onCompletion(FutureResponse future) {
>             try {
>                 Response response = future.getResult();
>                 if (response.isException()) {
>                     ExceptionResponse er = (ExceptionResponse) response;
>                     serviceLocalException(er.getException());
>                 } else {
>                     localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                     dequeueCounter.incrementAndGet();
>                 }   
>             } catch (IOException e) {
>                 serviceLocalException(e);
>             } finally {
>                 sub.decrementOutstandingResponses();
>             }
>         }
>     };
>     
>     remoteBroker.asyncRequest(message, callback);
> }
> {code}
> The apparent preservation of {{responseRequired}} is a result of {{remoteBroker.oneway(message);}} versus {{remoteBroker.asyncRequest(message, callback);}}
> Solution
> ========
> It seems odd that there should be any concern for the message's original {{responseRequired}} flag.  Once the message is dispatched to the bridge for forwarding, the original producer ceases to care and not waiting for a response.  Once a response is returned from the remote broker, it is only used to signal the message ACK so that the local broker so that the inflight and dequeue counts can be updated --- neither the response nor the ACK continues on to the original producer.
> Because the blocking of the network bridge by producer flow control on one queue can have a serious side effect (blocking the bridge completely), I think the best solution is to remove the logic from DemandForwardingBridgeSupport that takes into account {{message.isResponseRequired}} and simply always forwards the message with {{remoteBroker.asyncRequest(message, callback);}}
> Alternatively (and unnecessarily if the {{remoteBroker.oneway(message);}} is removed), I've attached a patch that adds a {{remoteDispatchType}} field to org.apache.activemq.network.NetworkBridgeConfiguration.
> {{remoteDispatchType}} can have one of three values:
> # {{AUTO}} - DemandForwardingBridgeSupport works as described above and uses {{remoteBroker.oneway(message);}} or {{remoteBroker.asyncRequest(message, callback);}} depending on {{message.isResponseRequired}}
> # {{ALWAYS_SYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.asyncRequest(message, callback);}} to forward all messages (i.e., it behaves the same as the first suggested solution)
> # {{ALWAYS_ASYNC}} - DemandForwardingBridgeSupport uses {{remoteBroker.oneway(message);}} to forward all messages
> A unit test is also included which demonstrates the good/bad behaviour for all combinations of persistent/non-persistent and {{remoteDispatchType}}.  Pay particular note to the final assertions in the unit test -- the test is designed to pass as-is by modifying the expectations to validate the bad behaviour when necessary.  Ideally, the bad behaviour should cause test case failure if you feel that blocking the entire network bridge is a bug.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.





  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.




> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:

{code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
                        
                        if (!message.isResponseRequired()) {
                            
                            // If the message was originally sent using async
                            // send, we will preserve that QOS
                            // by bridging it using an async send (small chance
                            // of message loss).
                            try {
                                remoteBroker.oneway(message);
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                dequeueCounter.incrementAndGet();
                            } finally {
                                sub.decrementOutstandingResponses();
                            }
                            
                        } else {
                            
                            // The message was not sent using async send, so we
                            // should only ack the local
                            // broker when we get confirmation that the remote
                            // broker has received the message.
                            ResponseCallback callback = new ResponseCallback() {
                                public void onCompletion(FutureResponse future) {
                                    try {
                                        Response response = future.getResult();
                                        if (response.isException()) {
                                            ExceptionResponse er = (ExceptionResponse) response;
                                            serviceLocalException(er.getException());
                                        } else {
                                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                            dequeueCounter.incrementAndGet();
                                        }   
                                    } catch (IOException e) {
                                        serviceLocalException(e);
                                    } finally {
                                        sub.decrementOutstandingResponses();
                                    }
                                }
                            };
                            
                            remoteBroker.asyncRequest(message, callback);
                            
                        }
{code}




  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.






> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has +responseRequired=false+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.
> The preservation of the {{responseRequired}} flag occurs in org.apache.activemq.network.DemandForwardingBridgeSupport:
> {code:title=DemandForwardingBridgeSupport#serviceLocalCommand(...)}
>                         
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                                 dequeueCounter.incrementAndGet();
>                             } finally {
>                                 sub.decrementOutstandingResponses();
>                             }
>                             
>                         } else {
>                             
>                             // The message was not sent using async send, so we
>                             // should only ack the local
>                             // broker when we get confirmation that the remote
>                             // broker has received the message.
>                             ResponseCallback callback = new ResponseCallback() {
>                                 public void onCompletion(FutureResponse future) {
>                                     try {
>                                         Response response = future.getResult();
>                                         if (response.isException()) {
>                                             ExceptionResponse er = (ExceptionResponse) response;
>                                             serviceLocalException(er.getException());
>                                         } else {
>                                             localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
>                                             dequeueCounter.incrementAndGet();
>                                         }   
>                                     } catch (IOException e) {
>                                         serviceLocalException(e);
>                                     } finally {
>                                         sub.decrementOutstandingResponses();
>                                     }
>                                 }
>                             };
>                             
>                             remoteBroker.asyncRequest(message, callback);
>                             
>                         }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] [Updated] (AMQ-3331) When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3331:
-------------------------------

    Description: 
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has {tt}responseRequired=false{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.



  was:
Symptom
=======
Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.

At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.

If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->0     ------>       0->1000->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
# Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
# Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.


This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).

If the messages produced by Broker A are *non-persistent*, we see this behaviour:

{noformat}
Broker A       Bridge        Broker B
========                     ========
0->1000->500   ------>       0->500->...
0->1000->500                 0->500->...
{noformat}

The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.

This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.

It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.

These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}

Cause
=====

The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:

{code:title=Queue#send(...)}
// We can avoid blocking due to low usage if the producer is
// sending
// a sync message or if it is using a producer window
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
{code}

and

{code:title=Queue#send(...)}
} else 
  if (memoryUsage.isFull()) {
    waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
                                + message.getProducerId() + ") stopped to prevent flooding "
                                + getActiveMQDestination().getQualifiedName() + "."
                                + " See http://activemq.apache.org/producer-flow-control.html for more info");
  }
{code}

There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has {tt}responseRequired=true{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).

When a persistent or non-transactional message is sent to Broker A's queues, it has {tt}responseRequired=false{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.




> When a producer from a network bridge is blocked by producer flow control, all producers from the network bridge get blocked.
> -----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3331
>                 URL: https://issues.apache.org/jira/browse/AMQ-3331
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Test Cases, Transport
>    Affects Versions: 5.5.0
>            Reporter: Stirling Chow
>
> Symptom
> =======
> Broker A produces messages to two queues, Q1 and Q2.  Broker B consumes messages from two queues, Q1 and Q2.  Broker A is connected by a demand forwarding bridge, over TCP, to Broker B so that messages produced to Q1/Q2 will be forwarded to the consumers on Broker B.
> At some point, Broker B's instance of Q2 becomes full (e.g., because the Q2 consumer is slow), and this triggers producer flow control to halt new messages being sent to Broker B's Q2 over the bridge.  Broker A's instances of Q1/Q2 are not full, so the producers on Broker A are not blocked.
> If the messages produced by Broker A are *persistent*, we see this behaviour over the course of the production of 1000 messages to both Q1/Q2, where Broker B's Q2 becomes full on the 500th message:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->0     ------>       0->1000->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are what we expected, namely: 
> # Broker A produces 1000 messages to Q1 without blocking and all of these messages are forwarded to Broker B's Q1 without blocking, eventually being consumed by Broker B's Q1 consumer.
> # Broker A produces 1000 messages to Q2 without blocking and 500 of these messages are forwarded to Broker B's Q2 before producer flow control blocks the flow until Broker B's Q2 consumer can start reducing the queue size.
> This is good because the bridge treats Q1 and Q2 independently (i.e., producer flow control on Q2 does not block the messages forwarded to Q1).
> If the messages produced by Broker A are *non-persistent*, we see this behaviour:
> {noformat}
> Broker A       Bridge        Broker B
> ========                     ========
> 0->1000->500   ------>       0->500->...
> 0->1000->500                 0->500->...
> {noformat}
> The above results, which assume network and consumer prefetch sizes of 1, are not what we expected, namely: producer flow control on Broker B's instance of Q2 blocks the forwarding of messages to Broker B's instance of Q1.
> This is not good because producer flow control on Q2 essentially triggers producer flow control on Q1, even though Q1 is *not* full.
> It also seems strange (and almost non-intuitive until you understand the cause), that peristent messages should behave better than non-persistent messages.  The same difference in behaviour can also be observed with persistent messages if Broker A these outside a JMS transaction (e.g., AUTO_ACKNOWLEDGE) versus inside a JMS transaction: outside behaves appropriately, with Q1 independent of Q2, but inside behaves the same as the non-persistent case with Q1 blocked by Q2.
> These observations are contrary to the AMQ 5.0 documentation regarding producer flow control: {quote}As of ActiveMQ 5.0, we can now individually flow control each producer on a shared connection without having to suspend the entire connection.{quote}
> Cause
> =====
> The difference in behaviour between persistent and non-persistent (and transactionaly/non-transactional) is due to the two ways that org.apache.activemq.broker.region.Queue implements producer flow control:
> {code:title=Queue#send(...)}
> // We can avoid blocking due to low usage if the producer is
> // sending
> // a sync message or if it is using a producer window
> if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
> {code}
> and
> {code:title=Queue#send(...)}
> } else 
>   if (memoryUsage.isFull()) {
>     waitForSpace(context, memoryUsage, "Usage Manager Memory Limit is full. Producer ("
>                                 + message.getProducerId() + ") stopped to prevent flooding "
>                                 + getActiveMQDestination().getQualifiedName() + "."
>                                 + " See http://activemq.apache.org/producer-flow-control.html for more info");
>   }
> {code}
> There is only a single transport thread that services the TCP socket on Broker B.  This TCP socket is the "remote" end of the A->B bridge and is responsible for *sequentially* enqueueing to Broker B's queues all messages from Broker A.  When a non-persistent or transactional message is sent to Broker A's queues, it has +responseRequired=true+, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the first method of producer flow control will be used: the transport thread will not block, but the repsonse will be held back until the queue has room.  As a result, the transport thread is free to continue enqueueing messages from the bridge, particularly those destined for queues that are not full (NOTE: since the network prefetch is 1 no new messages to the full queue will be forwarded until the response is returned).
> When a persistent or non-transactional message is sent to Broker A's queues, it has {tt}responseRequired=false{tt}, which is preserved when the bridge forwards the message to Broker B's queues.  If producer flow control is triggered on Broker B's queue, the second method of producer flow control will be used: the transport thread will be blocked.  As a result, no other messages from the bridge will be forwarded, even those destined for queues that are not full.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira