You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by gaohoward <gi...@git.apache.org> on 2016/12/12 01:49:20 UTC

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

GitHub user gaohoward opened a pull request:

    https://github.com/apache/activemq-artemis/pull/907

    ARTEMIS-883 Fix OpenWire ProducerFlowControlTest Regression

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gaohoward/activemq-artemis master_fixOWtest1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #907
    
----
commit 4e0c88c984af54b334c28087008ba3ee1aa9f5ce
Author: Howard Gao <ho...@gmail.com>
Date:   2016-12-11T11:14:14Z

    ARTEMIS-883 Fix OpenWire ProducerFlowControlTest Regression

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gtully <gi...@git.apache.org>.
Github user gtully commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92598858
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    in that case, the broker will stop reading from the connection b/c it will block waiting for memory and the client will eventually block on the tcp send buffer, ie: socket write.
    
    In artemis this would be a case of disabling read on that selector till there is some space on the broker to enqueue the message. When the client is sending async and the broker is limited all we can do is stop reading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r93464460
  
    --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java ---
    @@ -64,6 +64,9 @@
        @Before
        public void setUp() throws Exception {
           super.setUp();
    +      System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "5");
    --- End diff --
    
    shouldn't you remove this property on tearDown?
    
    What if another test is expecting this property differently? even if it's not the case now, I think you should clear it on tearDown.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    @clebertsuconic Just updated the PR using setAutoRead. Maybe we can fix the TTL issue in a separate JIRA?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92602072
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    @ghoward. That's exactly how it works on core protocol.  Let's just do setAutoRead(false).  But we need to fix the Ttl case while blocked. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92531732
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    @gaohoward from what I understood, it would wait the server to send an ack before send the next.. it's sync on that case, so it will wait the ack.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92070977
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    --- End diff --
    
    I don't quite understand this, the client should in this case either get the exception immediately (callback is called on the calling thread directly), or blocking to wait for a response (callback is called on a possible different thread).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    The test failure has nothing to do with this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gtully <gi...@git.apache.org>.
Github user gtully commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92431065
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    yep. If a producer indicates a producerWindowSize > 0 on each send there is a producerAck to replenish the client. If there is no space on the broker, producerWindow num messages can be pending a response and there will be no producerAck. Then the client blocks because it has no window. It blocks client side in the call to producer.send.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92600853
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    @gtully Thanks Gary. If we disabling read at the broker side, would the producer still write some more bytes into tcp buffer before fully blocked? does that matter?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92425813
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    I just spoke to @gtully. The right implementation here should be only send ProducerACK upon the space availabitily. Similar to what we do for credits on the core protocol. it will get the same effect and it won't interrupt TTL checks. Which is what we need.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    @gaohoward  I have fixed the things myself.. can you look later on master please?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    ok, I'll do it. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r93465898
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -303,108 +301,103 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
    --- End diff --
    
    @gaohoward sorry Howard, this looks a hack to me... and it's even harder to debug what's going on...
    store.checkMemory() will happen outside of the context of the caller's thread...
    
    
    What I meant by the Runnable will throw a Thread, is that you should fix it properly.. not adding this kind of thing that will never meant to run... The thread still set an exception on a finally that will never happen as the caller's thread is gone...
    
    
    
    Instead of throwing an exception, your runnable should call connection.sendException() and interrupt execution, instead of having this code in place.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92072316
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    another issue with setAutoRead() I forgot to mention is that the client producer seems to have some TCP buffer when auto read is false. So the producer seems always can write some more bytes into the TCP layer without knowing that they are actually stay at client side buffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r91977700
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    Even worse.. you are blocking a thread.. that's not a valid fix!!!!!
    
    
    This is supposed to be treated with asynchronous callbacks...
    
    
    The test may not be working, but the blocker is working.. you just made it blocking a thread... which is really far from what i wanted here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r93466239
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -303,108 +301,103 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    -
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
              ServerMessage coreMsg = originalCoreMsg.copy();
    -
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         coreMsg.setAddress(address);
     
              if (actualDestinations[i].isQueue()) {
                 checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    -
    -         if (actualDestinations[i].isQueue()) {
                 coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
              } else {
                 coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
              }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
     
    -         if (runToUse != null) {
    -            // if the timeout is >0, it will wait this much milliseconds
    -            // before running the the runToUse
    -            // this will eventually unblock blocked destinations
    -            // playing flow control
    -            store.checkMemory(runToUse);
    +         this.connection.disableTtl();
    +         if (shouldBlockProducer) {
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
    +
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    --- End diff --
    
    remove these..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92525337
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    @gtully hey Gary, what should broker response to a producer having a 0 window size and sending non-persistent messages when broker space is full? Suppose syncSend is not forced on connection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/activemq-artemis/pull/907


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r91951349
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    this latch.await here is useless... you are declaring it in the same thread... it will either always countdown or throw an exception.
    
    
    can you either remove it or fix the logic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92076475
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    @gaohoward we should do the opposite. We fix the test, we don't break the server.
    
    Lets make use of credits.. and stop sending credits back to the client. WE just need to make sure we have blocking working.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    It's not just the TTL, the exception inside the Runnable.
    
    There's also a case where you set Dont Responde but I don't see it being set back.
    
    
    this needs more testing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    @clebertsuconic I've added the test for ttl and just fixed the Dont Response issue. I'll see if the tests all pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92076234
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    Yes the openwire producer flow control has a window size to use, but it mostly controlled at client side (it doesn't ask for credits from server like core does). 
    The reason why we block the entire connection is that openwire has a specific test for this use case:
    org.apache.activemq.artemis.tests.integration.openwire.amq.ProducerFlowControlTest#test2ndPublisherWithStandardConnectionThatIsBlocked
    It is usually not a good option but there are other options the user can use (see other tests in ProducerFlowControlTest).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    @clebertsuconic I'm looking at some regressions caused by this PR. pls don't merge now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92077376
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    Let me think about it... the first thing pops up in my mind is that we can't change the client (openwire) code, but the credits thing requires client-server coordination...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r91985311
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    --- End diff --
    
    you are sending an exception inside the Runnable callback on the checkMemory... this is not valid...
    
    The client may never get this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    @clebertsuconic the openwire tests all pass now. I think it's ready for review.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92074326
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    I strongly disagree.. we are not supposed to block...
    
    
    when you block a Thread, you are also blocking all the sessions on the connection.. so just use setAutoRead(false) instead of blocking a thread.
    
    We have a neat non-blocking architecture in place. We have to avoid these kind of things. Especially that the producer might be blocked for a long time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92524664
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    Thanks guys I'll come back to it later today. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis issue #907: ARTEMIS-883 Fix OpenWire ProducerFlowControlTes...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on the issue:

    https://github.com/apache/activemq-artemis/pull/907
  
    Summary for the situation of this PR:
    
    
    This PR is about fixing blocking client producers in OpenWire...
    
    The implementation is currently using setAutoRead, and there is a test failing Howard was trying to fix it.
    
    
    
    The PR here is blocking a thread for non persistent, which I -1000 about it.
    
    This should really either setAutoRead (false) from Netty, or stop sending confirmation back to clients..
    
    
    Also there's an issue that I see that if a producer is blocked for a long time connections will drop for TTL. So there should be some check for pending requests and ignore TTL checks on that case. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92074388
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    The idea would be if this was based on credits instead of blocking. Isn't there any credits on openWire?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92081864
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    that's why I asked if there is already any logic in place for credits on openwire. Maybe the client already does such blocking.. and we just need to stop or resume credits accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by gaohoward <gi...@git.apache.org>.
Github user gaohoward commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92079182
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    I meant the code on client side (which is part of ActiveMQ5, not Artemis), so we can't change it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] activemq-artemis pull request #907: ARTEMIS-883 Fix OpenWire ProducerFlowCon...

Posted by clebertsuconic <gi...@git.apache.org>.
Github user clebertsuconic commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/907#discussion_r92076540
  
    --- Diff: artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---
    @@ -302,108 +301,91 @@ public void send(final ProducerInfo producerInfo,
              originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
           }
     
    -      Runnable runnable;
    -
    -      if (sendProducerAck) {
    -         runnable = new Runnable() {
    -            @Override
    -            public void run() {
    -               try {
    -                  ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    -                  connection.dispatchSync(ack);
    -               } catch (Exception e) {
    -                  ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    -                  connection.sendException(e);
    -               }
    -
    -            }
    -         };
    -      } else {
    -         final Connection transportConnection = connection.getTransportConnection();
    -
    -         if (transportConnection == null) {
    -            // I don't think this could happen, but just in case, avoiding races
    -            runnable = null;
    -         } else {
    -            runnable = new Runnable() {
    -               @Override
    -               public void run() {
    -                  transportConnection.setAutoRead(true);
    -               }
    -            };
    -         }
    -      }
    -
    -      internalSend(actualDestinations, originalCoreMsg, runnable);
    -   }
    +      boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired();
     
    -   private void internalSend(ActiveMQDestination[] actualDestinations,
    -                             ServerMessage originalCoreMsg,
    -                             final Runnable onComplete) throws Exception {
    +      final AtomicInteger count = new AtomicInteger(actualDestinations.length);
     
    -      Runnable runToUse;
    +      final Exception[] anyException = new Exception[] {null};
     
    -      if (actualDestinations.length <= 1 || onComplete == null) {
    -         // if onComplete is null, this will be null ;)
    -         runToUse = onComplete;
    -      } else {
    -         final AtomicInteger count = new AtomicInteger(actualDestinations.length);
    -         runToUse = new Runnable() {
    -            @Override
    -            public void run() {
    -               if (count.decrementAndGet() == 0) {
    -                  onComplete.run();
    -               }
    -            }
    -         };
    +      if (shouldBlockProducer) {
    +         connection.getContext().setDontSendReponse(true);
           }
     
    -      SimpleString[] addresses = new SimpleString[actualDestinations.length];
    -      PagingStore[] pagingStores = new PagingStore[actualDestinations.length];
    -
    -      // We fillup addresses, pagingStores and we will throw failure if that's the case
           for (int i = 0; i < actualDestinations.length; i++) {
              ActiveMQDestination dest = actualDestinations[i];
    -         addresses[i] = new SimpleString(dest.getPhysicalName());
    -         pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
    -         if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
    -            throw new ResourceAllocationException("Queue is full");
    -         }
    -      }
    -
    -      for (int i = 0; i < actualDestinations.length; i++) {
    +         SimpleString address = new SimpleString(dest.getPhysicalName());
    +         PagingStore store = server.getPagingManager().getPageStore(address);
     
              ServerMessage coreMsg = originalCoreMsg.copy();
    +         coreMsg.setAddress(address);
     
    -         coreMsg.setAddress(addresses[i]);
    -
    -         PagingStore store = pagingStores[i];
    -
    -         if (store.isFull()) {
    -            connection.getTransportConnection().setAutoRead(false);
    -         }
    +         if (shouldBlockProducer) {
     
    -         if (actualDestinations[i].isQueue()) {
    -            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
    -         }
    +            if (!store.checkMemory(() -> {
    +               try {
    +                  RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary());
     
    -         if (actualDestinations[i].isQueue()) {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType());
    +                  if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
    +                     throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest);
    +                  }
    +               } catch (Exception e) {
    +                  if (anyException[0] == null) {
    +                     anyException[0] = e;
    +                  }
    +               }
    +               if (count.decrementAndGet() == 0) {
    +                  if (anyException[0] != null) {
    +                     ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]);
    +                     connection.sendException(anyException[0]);
    +                  } else {
    +                     if (sendProducerAck) {
    +                        try {
    +                           ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
    +                           connection.dispatchAsync(ack);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     } else {
    +                        try {
    +                           //maybe use this: connection.getContext().setDontSendReponse(false);
    +                           Response response = new Response();
    +                           response.setCorrelationId(messageSend.getCommandId());
    +                           connection.dispatchAsync(response);
    +                        } catch (Exception e) {
    +                           ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
    +                           connection.sendException(e);
    +                        }
    +                     }
    +                  }
    +               }
    +            })) {
    +               this.connection.getContext().setDontSendReponse(false);
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
              } else {
    -            coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType());
    -         }
    -         RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
    +            //non-persistent messages goes here, by default we block on the calling thread
    +            //which means the whole connection is blocked when address is full.
    +            final CountDownLatch latch = new CountDownLatch(1);
    +            if (!store.checkMemory(() -> {
    +               latch.countDown();
    +            })) {
    +               throw new ResourceAllocationException("Queue is full " + address);
    +            }
     
    -         if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
    -            throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]);
    -         }
    +            latch.await();
    --- End diff --
    
    @gaohoward credits is always better than setAutoRead(false), otherwise we could get TTL issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---