You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/07/31 16:21:36 UTC

[1/3] activemq-artemis git commit: ARTEMIS-856 - Support consumersBeforeDispatch and delayBeforeDispatch

Repository: activemq-artemis
Updated Branches:
  refs/heads/master f0c13622a -> 756609f6a


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index c7021fd..6fc5019 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -170,6 +170,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Boolean defaultPurgeOnNoConsumers = null;
 
+   private Integer defaultConsumersBeforeDispatch = null;
+
+   private Long defaultDelayBeforeDispatch = null;
+
    private RoutingType defaultQueueRoutingType = null;
 
    private RoutingType defaultAddressRoutingType = null;
@@ -214,6 +218,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
       this.defaultMaxConsumers = other.defaultMaxConsumers;
       this.defaultPurgeOnNoConsumers = other.defaultPurgeOnNoConsumers;
+      this.defaultConsumersBeforeDispatch = other.defaultConsumersBeforeDispatch;
+      this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
       this.defaultQueueRoutingType = other.defaultQueueRoutingType;
       this.defaultAddressRoutingType = other.defaultAddressRoutingType;
    }
@@ -328,6 +334,24 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public int getDefaultConsumersBeforeDispatch() {
+      return defaultConsumersBeforeDispatch != null ? defaultConsumersBeforeDispatch : ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
+   }
+
+   public AddressSettings setDefaultConsumersBeforeDispatch(Integer defaultConsumersBeforeDispatch) {
+      this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch;
+      return this;
+   }
+
+   public long getDefaultDelayBeforeDispatch() {
+      return defaultDelayBeforeDispatch != null ? defaultDelayBeforeDispatch : ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
+   }
+
+   public AddressSettings setDefaultDelayBeforeDispatch(Long defaultDelayBeforeDispatch) {
+      this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch;
+      return this;
+   }
+
    public boolean isDefaultPurgeOnNoConsumers() {
       return defaultPurgeOnNoConsumers != null ? defaultPurgeOnNoConsumers : ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
    }
@@ -667,6 +691,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (defaultAddressRoutingType == null) {
          defaultAddressRoutingType = merged.defaultAddressRoutingType;
       }
+      if (defaultExclusiveQueue == null) {
+         defaultExclusiveQueue = merged.defaultExclusiveQueue;
+      }
+      if (defaultLastValueQueue == null) {
+         defaultLastValueQueue = merged.defaultLastValueQueue;
+      }
+      if (defaultConsumersBeforeDispatch == null) {
+         defaultConsumersBeforeDispatch = merged.defaultConsumersBeforeDispatch;
+      }
+      if (defaultDelayBeforeDispatch == null) {
+         defaultDelayBeforeDispatch = merged.defaultDelayBeforeDispatch;
+      }
    }
 
    @Override
@@ -767,6 +803,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          defaultExclusiveQueue = BufferHelper.readNullableBoolean(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         defaultConsumersBeforeDispatch = BufferHelper.readNullableInteger(buffer);
+      }
+
+      if (buffer.readableBytes() > 0) {
+         defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
+      }
    }
 
    @Override
@@ -805,7 +849,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(defaultPurgeOnNoConsumers) +
          DataConstants.SIZE_BYTE +
          DataConstants.SIZE_BYTE +
-         BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue);
+         BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
+         BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
+         BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch);
    }
 
    @Override
@@ -882,6 +928,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableBoolean(buffer, defaultExclusiveQueue);
 
+      BufferHelper.writeNullableInteger(buffer, defaultConsumersBeforeDispatch);
+
+      BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
+
    }
 
    /* (non-Javadoc)
@@ -928,6 +978,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((defaultPurgeOnNoConsumers == null) ? 0 : defaultPurgeOnNoConsumers.hashCode());
       result = prime * result + ((defaultQueueRoutingType == null) ? 0 : defaultQueueRoutingType.hashCode());
       result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
+      result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
+      result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
       return result;
    }
 
@@ -1133,6 +1185,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
             return false;
       } else if (!defaultAddressRoutingType.equals(other.defaultAddressRoutingType))
          return false;
+
+      if (defaultConsumersBeforeDispatch == null) {
+         if (other.defaultConsumersBeforeDispatch != null)
+            return false;
+      } else if (!defaultConsumersBeforeDispatch.equals(other.defaultConsumersBeforeDispatch))
+         return false;
+
+      if (defaultDelayBeforeDispatch == null) {
+         if (other.defaultDelayBeforeDispatch != null)
+            return false;
+      } else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
+         return false;
       return true;
    }
 
@@ -1212,6 +1276,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultQueueRoutingType +
          ", defaultAddressRoutingType=" +
          defaultAddressRoutingType +
+         ", defaultConsumersBeforeDispatch=" +
+         defaultConsumersBeforeDispatch +
+         ", defaultDelayBeforeDispatch=" +
+         defaultDelayBeforeDispatch +
          "]";
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 831b4cb..e96923d 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -510,6 +510,8 @@
                         <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
+                        <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
                         <xsd:attributeGroup ref="xml:specialAttrs"/>
                      </xsd:complexType>
                   </xsd:element>
@@ -2802,6 +2804,22 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-consumers-before-dispatch" type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default number of consumers needed before dispatch can start for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="default-delay-before-dispatch" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default delay to wait before dispatching if number of consumers before dispatch is not met for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="redistribution-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
@@ -3119,6 +3137,8 @@
       <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
+      <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 4cdd11c..8fcac20 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -423,7 +423,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals("color='blue'", queueConfiguration.getFilterString());
       assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr1", queueConfiguration.getAddress());
-      assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers());
+      // If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
+      assertEquals(null, queueConfiguration.getMaxConsumers());
 
       // Addr 1 Queue 2
       queueConfiguration = addressConfiguration.getQueueConfigurations().get(1);
@@ -431,7 +432,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals("q2", queueConfiguration.getName());
       assertTrue(queueConfiguration.isDurable());
       assertEquals("color='green'", queueConfiguration.getFilterString());
-      assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers());
+      assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers().intValue());
       assertFalse(queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr1", queueConfiguration.getAddress());
 
@@ -449,7 +450,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals("q3", queueConfiguration.getName());
       assertTrue(queueConfiguration.isDurable());
       assertEquals("color='red'", queueConfiguration.getFilterString());
-      assertEquals(10, queueConfiguration.getMaxConsumers());
+      assertEquals(10, queueConfiguration.getMaxConsumers().intValue());
       assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr2", queueConfiguration.getAddress());
 
@@ -459,7 +460,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals("q4", queueConfiguration.getName());
       assertTrue(queueConfiguration.isDurable());
       assertNull(queueConfiguration.getFilterString());
-      assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers());
+      // If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
+      assertEquals(null, queueConfiguration.getMaxConsumers());
       assertTrue(queueConfiguration.getPurgeOnNoConsumers());
       assertEquals("addr2", queueConfiguration.getAddress());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index f45a1dd..96de8c7 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -794,6 +794,41 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public int getConsumersBeforeDispatch() {
+         return 0;
+      }
+
+      @Override
+      public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+
+      }
+
+      @Override
+      public long getDelayBeforeDispatch() {
+         return 0;
+      }
+
+      @Override
+      public void setDelayBeforeDispatch(long delayBeforeDispatch) {
+
+      }
+
+      @Override
+      public long getDispatchStartTime() {
+         return 0;
+      }
+
+      @Override
+      public boolean isDispatching() {
+         return false;
+      }
+
+      @Override
+      public void setDispatching(boolean dispatching) {
+
+      }
+
+      @Override
       public void setMaxConsumer(int maxConsumers) {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-tools/src/test/resources/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 41c881e..30de90b 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -491,6 +491,8 @@
                         <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
+                        <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
                         <xsd:attributeGroup ref="xml:specialAttrs"/>
                      </xsd:complexType>
                   </xsd:element>
@@ -2498,6 +2500,22 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-consumers-before-dispatch" type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default number of consumers needed before dispatch can start for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="default-delay-before-dispatch" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default delay to wait before dispatching if number of consumers before dispatch is not met for queues under the address.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="redistribution-delay" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
@@ -2769,6 +2787,8 @@
       <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
+      <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
new file mode 100644
index 0000000..4d2d195
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerDelayDispatchTest extends JMSTestBase {
+
+   private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.delay.queue");
+   private SimpleString normalQueueName = SimpleString.toSimpleString("jms.noraml.queue");
+
+   private static final long DELAY_BEFORE_DISPATCH = 10000L;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, 2, DELAY_BEFORE_DISPATCH, true);
+      server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, 0, -1, true);
+
+   }
+
+
+   protected ConnectionFactory getCF() throws Exception {
+      return cf;
+   }
+
+   @Test
+   public void testNoDelayOnDefault() throws Exception {
+      sendMessage(normalQueueName);
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+
+         Destination queue = session.createQueue(normalQueueName.toString());
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer1));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testDelayBeforeDispatch() throws Exception {
+      sendMessage(queueName);
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+
+         Destination queue = session.createQueue(queueName.toString());
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer1));
+         Thread.sleep(DELAY_BEFORE_DISPATCH);
+
+         Assert.assertNotNull(receive(consumer1));
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testConsumersBeforeDispatch() throws Exception {
+      sendMessage(queueName);
+
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+         Destination queue = session.createQueue(queueName.toString());
+
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer1));
+
+         MessageConsumer consumer2 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer1, consumer2));
+      } finally {
+         connection.close();
+      }
+   }
+
+
+   @Test
+   public void testContinueAndResetConsumer() throws Exception {
+      sendMessage(queueName);
+
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+         Destination queue = session.createQueue(queueName.toString());
+
+         MessageConsumer consumer1 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer1));
+
+         MessageConsumer consumer2 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer1, consumer2));
+
+         consumer2.close();
+
+         //Ensure that now dispatch is active, if we close a consumer, dispatching continues.
+         sendMessage(queueName);
+
+         Assert.assertNotNull(receive(consumer1));
+
+         //Stop all consumers, which should reset dispatch rules.
+         consumer1.close();
+
+         //Ensure that once all consumers are stopped, that dispatch rules reset and wait for min consumers.
+         sendMessage(queueName);
+
+         MessageConsumer consumer3 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer3));
+
+         MessageConsumer consumer4 = session.createConsumer(queue);
+
+         Assert.assertNotNull(receive(consumer3, consumer4));
+
+
+         //Stop all consumers, which should reset dispatch rules.
+         consumer3.close();
+         consumer4.close();
+
+         //Ensure that once all consumers are stopped, that dispatch rules reset and wait for delay.
+         sendMessage(queueName);
+
+         MessageConsumer consumer5 = session.createConsumer(queue);
+
+         Assert.assertNull(receive(consumer5));
+
+         Thread.sleep(DELAY_BEFORE_DISPATCH);
+
+         Assert.assertNotNull(receive(consumer5));
+
+      } finally {
+         connection.close();
+      }
+   }
+
+   private Message receive(MessageConsumer consumer1) throws JMSException {
+      return consumer1.receive(1000);
+   }
+
+   private Message receive(MessageConsumer consumer1, MessageConsumer consumer2) throws JMSException {
+      Message receivedMessage = receive(consumer1);
+      if (receivedMessage == null) {
+         receivedMessage = receive(consumer2);
+      }
+      return receivedMessage;
+   }
+
+   public void sendMessage(SimpleString queue) throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         connection.start();
+
+         Destination destination = session.createQueue(queue.toString());
+         MessageProducer producer = session.createProducer(destination);
+
+         TextMessage message = session.createTextMessage();
+         message.setText("Message");
+         producer.send(message);
+      } finally {
+         connection.close();
+      }
+   }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index ae24a45..2d78092 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -156,6 +156,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception {
+            return null;
+         }
+
+         @Override
          public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
             proxy.invokeOperation("deleteAddress", name);
          }
@@ -189,6 +194,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public String createQueue(String address, String routingType, String name, String filterStr, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
+            return null;
+         }
+
+         @Override
          public void createQueue(final String address, final String name, final boolean durable) throws Exception {
             proxy.invokeOperation("createQueue", address, name, durable);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
index e3b179b..ac2ed61 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
@@ -105,6 +105,52 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
       Assert.assertTrue(queueBinding2.getQueue().isExclusive());
    }
 
+   @Test
+   public void testQueueConfigConsumersBeforeDispatchAndRestart() throws Exception {
+      int consumersBeforeDispatch = 5;
+      ActiveMQServer server = createServer(true);
+
+      server.start();
+
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queue = new SimpleString("test.queue");
+
+      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, consumersBeforeDispatch, -1, true);
+
+      QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch());
+
+      server.stop();
+
+      server.start();
+
+      QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch());
+   }
+
+   @Test
+   public void testQueueConfigDelayBeforeDispatchAndRestart() throws Exception {
+      long delayBeforeDispatch = 5000L;
+      ActiveMQServer server = createServer(true);
+
+      server.start();
+
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queue = new SimpleString("test.queue");
+
+      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, 0, delayBeforeDispatch, true);
+
+      QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch());
+
+      server.stop();
+
+      server.start();
+
+      QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue);
+      Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch());
+   }
+
 
    @Test
    public void testQueueConfigUserAndRestart() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 192d700..71ced7f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -49,6 +49,41 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public int getConsumersBeforeDispatch() {
+      return 0;
+   }
+
+   @Override
+   public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+
+   }
+
+   @Override
+   public long getDelayBeforeDispatch() {
+      return 0;
+   }
+
+   @Override
+   public void setDelayBeforeDispatch(long delayBeforeDispatch) {
+
+   }
+
+   @Override
+   public long getDispatchStartTime() {
+      return 0;
+   }
+
+   @Override
+   public boolean isDispatching() {
+      return false;
+   }
+
+   @Override
+   public void setDispatching(boolean dispatching) {
+
+   }
+
+   @Override
    public boolean isExclusive() {
       // no-op
       return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 3f35084..44e5823 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -49,6 +49,8 @@ public class FakePostOffice implements PostOffice {
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
+                                   Integer consumersBeforeDispatch,
+                                   Long delayBeforeDispatch,
                                    SimpleString user) throws Exception {
       return null;
    }


[3/3] activemq-artemis git commit: This closes #2198

Posted by cl...@apache.org.
This closes #2198


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/756609f6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/756609f6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/756609f6

Branch: refs/heads/master
Commit: 756609f6a67bfbe546bc9d748d7278be296d4bdd
Parents: f0c1362 8a9835a
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Jul 31 12:12:00 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 31 12:12:00 2018 -0400

----------------------------------------------------------------------
 .../activemq/artemis/utils/BooleanUtil.java     |  28 +++
 .../config/ActiveMQDefaultConfiguration.java    |  12 +
 .../core/management/ActiveMQServerControl.java  |  61 +++++
 .../core/config/CoreQueueConfiguration.java     |  52 ++++-
 .../deployers/impl/FileConfigurationParser.java |  20 +-
 .../impl/ActiveMQServerControlImpl.java         |  33 ++-
 .../core/persistence/QueueBindingInfo.java      |   8 +
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../codec/PersistentQueueBindingEncoding.java   |  48 +++-
 .../artemis/core/postoffice/PostOffice.java     |   2 +
 .../core/postoffice/impl/PostOfficeImpl.java    |  10 +
 .../artemis/core/server/ActiveMQServer.java     |  27 +++
 .../activemq/artemis/core/server/Queue.java     |  14 ++
 .../artemis/core/server/QueueConfig.java        |  39 +++-
 .../core/server/impl/ActiveMQServerImpl.java    | 125 +++++++++--
 .../core/server/impl/LastValueQueue.java        |   4 +-
 .../server/impl/PostOfficeJournalLoader.java    |   2 +
 .../core/server/impl/QueueFactoryImpl.java      |   6 +-
 .../artemis/core/server/impl/QueueImpl.java     | 174 ++++++++++++---
 .../core/settings/impl/AddressSettings.java     |  70 +++++-
 .../resources/schema/artemis-configuration.xsd  |  20 ++
 .../core/config/impl/FileConfigurationTest.java |  10 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  35 +++
 .../test/resources/artemis-configuration.xsd    |  20 ++
 .../jms/client/ConsumerDelayDispatchTest.java   | 223 +++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java     |  10 +
 .../persistence/QueueConfigRestartTest.java     |  46 ++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  35 +++
 .../core/server/impl/fakes/FakePostOffice.java  |   2 +
 29 files changed, 1064 insertions(+), 74 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-856 - Support consumersBeforeDispatch and delayBeforeDispatch

Posted by cl...@apache.org.
ARTEMIS-856 - Support consumersBeforeDispatch and delayBeforeDispatch

https://issues.apache.org/jira/browse/ARTEMIS-856

This is equivalent to consumersBeforeDispatchStarts and timeBeforeDispatchStarts in ActiveMQ 5.x

http://activemq.apache.org/message-groups.html

This is addressing one of the items on the artemis roadmap: http://activemq.apache.org/activemq-artemis-roadmap.html

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8a9835a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8a9835a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8a9835a3

Branch: refs/heads/master
Commit: 8a9835a3947f99ea832fb31fd763174697c3570b
Parents: f0c1362
Author: Michael André Pearce <mi...@me.com>
Authored: Fri Jul 6 07:44:15 2018 +0100
Committer: Michael André Pearce <mi...@me.com>
Committed: Tue Jul 31 16:16:51 2018 +0100

----------------------------------------------------------------------
 .../activemq/artemis/utils/BooleanUtil.java     |  28 +++
 .../config/ActiveMQDefaultConfiguration.java    |  12 +
 .../core/management/ActiveMQServerControl.java  |  61 +++++
 .../core/config/CoreQueueConfiguration.java     |  52 ++++-
 .../deployers/impl/FileConfigurationParser.java |  20 +-
 .../impl/ActiveMQServerControlImpl.java         |  33 ++-
 .../core/persistence/QueueBindingInfo.java      |   8 +
 .../journal/AbstractJournalStorageManager.java  |   2 +-
 .../codec/PersistentQueueBindingEncoding.java   |  48 +++-
 .../artemis/core/postoffice/PostOffice.java     |   2 +
 .../core/postoffice/impl/PostOfficeImpl.java    |  10 +
 .../artemis/core/server/ActiveMQServer.java     |  27 +++
 .../activemq/artemis/core/server/Queue.java     |  14 ++
 .../artemis/core/server/QueueConfig.java        |  39 +++-
 .../core/server/impl/ActiveMQServerImpl.java    | 125 +++++++++--
 .../core/server/impl/LastValueQueue.java        |   4 +-
 .../server/impl/PostOfficeJournalLoader.java    |   2 +
 .../core/server/impl/QueueFactoryImpl.java      |   6 +-
 .../artemis/core/server/impl/QueueImpl.java     | 174 ++++++++++++---
 .../core/settings/impl/AddressSettings.java     |  70 +++++-
 .../resources/schema/artemis-configuration.xsd  |  20 ++
 .../core/config/impl/FileConfigurationTest.java |  10 +-
 .../impl/ScheduledDeliveryHandlerTest.java      |  35 +++
 .../test/resources/artemis-configuration.xsd    |  20 ++
 .../jms/client/ConsumerDelayDispatchTest.java   | 223 +++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java     |  10 +
 .../persistence/QueueConfigRestartTest.java     |  46 ++++
 .../unit/core/postoffice/impl/FakeQueue.java    |  35 +++
 .../core/server/impl/fakes/FakePostOffice.java  |   2 +
 29 files changed, 1064 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BooleanUtil.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BooleanUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BooleanUtil.java
new file mode 100644
index 0000000..91c02ad
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/BooleanUtil.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils;
+
+public class BooleanUtil {
+
+   public static int toInt(boolean value) {
+      return value ? 1 : 0;
+   }
+
+   public static boolean toBoolean(int value) {
+      return value != 0;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 70ba314..45397f3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -473,6 +473,10 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;
 
+   public static final int DEFAULT_CONSUMERS_BEFORE_DISPATCH = 0;
+
+   public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1;
+
    public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;
 
    public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig.";
@@ -1302,6 +1306,14 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_PURGE_ON_NO_CONSUMERS;
    }
 
+   public static int getDefaultConsumersBeforeDispatch() {
+      return DEFAULT_CONSUMERS_BEFORE_DISPATCH;
+   }
+
+   public static long getDefaultDelayBeforeDispatch() {
+      return DEFAULT_DELAY_BEFORE_DISPATCH;
+   }
+
    public static String getInternalNamingPrefix() {
       return DEFAULT_INTERNAL_NAMING_PREFIX;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 234a2d5..6ce945c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -581,6 +581,10 @@ public interface ActiveMQServerControl {
     * @param durable            is the queue durable?
     * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
     * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
+    * @param exclusive if the queue should route exclusively to one consumer
+    * @param lastValue use last-value semantics
+    * @param consumersBeforeDispatch number of consumers needed before dispatch can start
+    * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
     * @param autoCreateAddress  create an address with default values should a matching address not be found
     * @return a textual summary of the queue
     * @throws Exception
@@ -593,9 +597,42 @@ public interface ActiveMQServerControl {
                       @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
                       @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
                       @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
+                      @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") boolean exclusive,
+                      @Parameter(name = "lastValue", desc = "Use last-value semantics") boolean lastValue,
+                      @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") int consumersBeforeDispatch,
+                      @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") long delayBeforeDispatch,
                       @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
 
    /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address            address to bind the queue to
+    * @param routingType        the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+    * @param name               name of the queue
+    * @param filterStr          filter of the queue
+    * @param durable            is the queue durable?
+    * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
+    * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
+    * @param autoCreateAddress  create an address with default values should a matching address not be found
+    * @return a textual summary of the queue
+    * @throws Exception
+    */
+   @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
+   String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                      @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+                      @Parameter(name = "name", desc = "Name of the queue") String name,
+                      @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
+                      @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                      @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
+                      @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
+                      @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;
+
+
+   /**
     * Update a queue.
     *
     * @param name               name of the queue
@@ -652,6 +689,30 @@ public interface ActiveMQServerControl {
                       @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;
 
    /**
+    * Update a queue
+    *
+    * @param name               name of the queue
+    * @param routingType        the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+    * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
+    * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
+    * @param exclusive          if the queue should route exclusively to one consumer
+    * @param consumersBeforeDispatch number of consumers needed before dispatch can start
+    * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
+    * @param user               the user associated with this queue
+    * @return
+    * @throws Exception
+    */
+   @Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
+   String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
+                      @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+                      @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
+                      @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
+                      @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
+                      @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
+                      @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
+                      @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;
+
+   /**
     * Deploy a durable queue.
     * <br>
     * If {@code address} is {@code null} it will be defaulted to {@code name}.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index f301b90..2ccae2d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -39,14 +39,16 @@ public class CoreQueueConfiguration implements Serializable {
 
    private Boolean lastValue;
 
-   private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
+   private Integer maxConsumers;
+
+   private Integer consumersBeforeDispatch;
+
+   private Long delayBeforeDispatch;
 
    private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
 
    private RoutingType routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType();
 
-   private boolean maxConsumerConfigured = false;
-
    public CoreQueueConfiguration() {
    }
 
@@ -78,13 +80,12 @@ public class CoreQueueConfiguration implements Serializable {
       return lastValue;
    }
 
-   public boolean isMaxConsumerConfigured() {
-      return maxConsumerConfigured;
+   public Integer getConsumersBeforeDispatch() {
+      return consumersBeforeDispatch;
    }
 
-   public CoreQueueConfiguration setMaxConsumerConfigured(boolean maxConsumerConfigured) {
-      this.maxConsumerConfigured = maxConsumerConfigured;
-      return this;
+   public Long getDelayBeforeDispatch() {
+      return delayBeforeDispatch;
    }
 
    /**
@@ -128,6 +129,22 @@ public class CoreQueueConfiguration implements Serializable {
    }
 
    /**
+    * @param consumersBeforeDispatch for this queue, default is 0 (dispatch as soon as 1 consumer)
+    */
+   public CoreQueueConfiguration setConsumersBeforeDispatch(Integer consumersBeforeDispatch) {
+      this.consumersBeforeDispatch = consumersBeforeDispatch;
+      return this;
+   }
+
+   /**
+    * @param delayBeforeDispatch for this queue, default is 0 (start dispatch with no delay)
+    */
+   public CoreQueueConfiguration setDelayBeforeDispatch(Long delayBeforeDispatch) {
+      this.delayBeforeDispatch = delayBeforeDispatch;
+      return this;
+   }
+
+   /**
     * @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false
     */
    public CoreQueueConfiguration setPurgeOnNoConsumers(Boolean purgeOnNoConsumers) {
@@ -157,7 +174,7 @@ public class CoreQueueConfiguration implements Serializable {
       return purgeOnNoConsumers;
    }
 
-   public int getMaxConsumers() {
+   public Integer getMaxConsumers() {
       return maxConsumers;
    }
 
@@ -182,7 +199,8 @@ public class CoreQueueConfiguration implements Serializable {
       result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode());
       result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
       result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
-      result = prime * result + (maxConsumerConfigured ? 1331 : 1337);
+      result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
+      result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
       return result;
    }
 
@@ -202,8 +220,6 @@ public class CoreQueueConfiguration implements Serializable {
          return false;
       if (durable != other.durable)
          return false;
-      if (maxConsumerConfigured != other.maxConsumerConfigured)
-         return false;
       if (filterString == null) {
          if (other.filterString != null)
             return false;
@@ -237,6 +253,18 @@ public class CoreQueueConfiguration implements Serializable {
       } else if (!lastValue.equals(other.lastValue)) {
          return false;
       }
+      if (consumersBeforeDispatch == null) {
+         if (other.consumersBeforeDispatch != null)
+            return false;
+      } else if (!consumersBeforeDispatch.equals(other.consumersBeforeDispatch)) {
+         return false;
+      }
+      if (delayBeforeDispatch == null) {
+         if (other.delayBeforeDispatch != null)
+            return false;
+      } else if (!delayBeforeDispatch.equals(other.delayBeforeDispatch)) {
+         return false;
+      }
       return true;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 27badac..6c6a94c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -183,6 +183,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String DEFAULT_EXCLUSIVE_NODE_NAME = "default-exclusive-queue";
 
+   private static final String DEFAULT_CONSUMERS_BEFORE_DISPATCH = "default-consumers-before-dispatch";
+
+   private static final String DEFAULT_DELAY_BEFORE_DISPATCH = "default-delay-before-dispatch";
+
    private static final String REDISTRIBUTION_DELAY_NODE_NAME = "redistribution-delay";
 
    private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route";
@@ -1050,6 +1054,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setDefaultPurgeOnNoConsumers(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_MAX_CONSUMERS.equalsIgnoreCase(name)) {
             addressSettings.setDefaultMaxConsumers(XMLUtil.parseInt(child));
+         } else if (DEFAULT_CONSUMERS_BEFORE_DISPATCH.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultConsumersBeforeDispatch(XMLUtil.parseInt(child));
+         } else if (DEFAULT_DELAY_BEFORE_DISPATCH.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultDelayBeforeDispatch(XMLUtil.parseLong(child));
          } else if (DEFAULT_QUEUE_ROUTING_TYPE.equalsIgnoreCase(name)) {
             String value = getTrimmedTextContent(child);
             Validators.ROUTING_TYPE.validate(DEFAULT_QUEUE_ROUTING_TYPE, value);
@@ -1093,12 +1101,13 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       String address = null;
       String filterString = null;
       boolean durable = true;
-      boolean maxConumserConfigured = false;
-      int maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
+      Integer maxConsumers = null;
       boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
       String user = null;
       Boolean exclusive = null;
       Boolean lastValue = null;
+      Integer consumersBeforeDispatch = null;
+      Long delayBeforeDispatch = null;
 
       NamedNodeMap attributes = node.getAttributes();
       for (int i = 0; i < attributes.getLength(); i++) {
@@ -1106,13 +1115,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          if (item.getNodeName().equals("max-consumers")) {
             maxConsumers = Integer.parseInt(item.getNodeValue());
             Validators.MAX_QUEUE_CONSUMERS.validate(name, maxConsumers);
-            maxConumserConfigured = true;
          } else if (item.getNodeName().equals("purge-on-no-consumers")) {
             purgeOnNoConsumers = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("exclusive")) {
             exclusive = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("last-value")) {
             lastValue = Boolean.parseBoolean(item.getNodeValue());
+         } else if (item.getNodeName().equals("consumers-before-dispatch")) {
+            consumersBeforeDispatch = Integer.parseInt(item.getNodeValue());
+         } else if (item.getNodeName().equals("delay-before-dispatch")) {
+            delayBeforeDispatch = Long.parseLong(item.getNodeValue());
          }
       }
 
@@ -1132,7 +1144,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       }
 
       return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setUser(user)
-                                         .setExclusive(exclusive).setLastValue(lastValue).setMaxConsumerConfigured(maxConumserConfigured);
+                                         .setExclusive(exclusive).setLastValue(lastValue).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch);
    }
 
    protected CoreAddressConfiguration parseAddressConfiguration(final Node node) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 9ed7acd..ee190c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -806,6 +806,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              int maxConsumers,
                              boolean purgeOnNoConsumers,
                              boolean autoCreateAddress) throws Exception {
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address);
+      return createQueue(address, routingType, name, filterStr, durable, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultPurgeOnNoConsumers(), addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), addressSettings.isAutoCreateAddresses());
+   }
+
+   @Override
+   public String createQueue(String address,
+                             String routingType,
+                             String name,
+                             String filterStr,
+                             boolean durable,
+                             int maxConsumers,
+                             boolean purgeOnNoConsumers,
+                             boolean exclusive,
+                             boolean lastValue,
+                             int consumersBeforeDispatch,
+                             long delayBeforeDispatch,
+                             boolean autoCreateAddress) throws Exception {
       checkStarted();
 
       clearIO();
@@ -816,7 +833,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, autoCreateAddress);
+         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
          return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
       } catch (ActiveMQException e) {
          throw new IllegalStateException(e.getMessage());
@@ -851,12 +868,24 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
                              String user) throws Exception {
+      return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+   }
+
+   @Override
+   public String updateQueue(String name,
+                             String routingType,
+                             Integer maxConsumers,
+                             Boolean purgeOnNoConsumers,
+                             Boolean exclusive,
+                             Integer consumersBeforeDispatch,
+                             Long delayBeforeDispatch,
+                             String user) throws Exception {
       checkStarted();
 
       clearIO();
 
       try {
-         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, user);
+         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
          if (queue == null) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 7e18311..ebc86fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -62,6 +62,14 @@ public interface QueueBindingInfo {
 
    void setLastValue(boolean lastValue);
 
+   int getConsumersBeforeDispatch();
+
+   void setConsumersBeforeDispatch(int consumersBeforeDispatch);
+
+   long getDelayBeforeDispatch();
+
+   void setDelayBeforeDispatch(long delayBeforeDispatch);
+
    byte getRoutingType();
 
    void setRoutingType(byte routingType);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index dc2e20b..7c821a9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1275,7 +1275,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getRoutingType().getType());
+      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType());
 
       readLock();
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 2ab4396..0cfe67c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -50,6 +50,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public boolean lastValue;
 
+   public int consumersBeforeDispatch;
+
+   public long delayBeforeDispatch;
+
    public byte routingType;
 
    public PersistentQueueBindingEncoding() {
@@ -76,6 +80,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          exclusive +
          ", lastValue=" +
          lastValue +
+         ", consumersBeforeDispatch=" +
+         consumersBeforeDispatch +
+         ", delayBeforeDispatch=" +
+         delayBeforeDispatch +
          ", routingType=" +
          routingType +
          "]";
@@ -90,6 +98,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
                                          final boolean purgeOnNoConsumers,
                                          final boolean exclusive,
                                          final boolean lastValue,
+                                         final int consumersBeforeDispatch,
+                                         final long delayBeforeDispatch,
                                          final byte routingType) {
       this.name = name;
       this.address = address;
@@ -100,6 +110,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
       this.lastValue = lastValue;
+      this.consumersBeforeDispatch = consumersBeforeDispatch;
+      this.delayBeforeDispatch = delayBeforeDispatch;
       this.routingType = routingType;
    }
 
@@ -196,6 +208,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public int getConsumersBeforeDispatch() {
+      return consumersBeforeDispatch;
+   }
+
+   @Override
+   public void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+      this.consumersBeforeDispatch = consumersBeforeDispatch;
+   }
+
+   @Override
+   public long getDelayBeforeDispatch() {
+      return delayBeforeDispatch;
+   }
+
+   @Override
+   public void setDelayBeforeDispatch(long delayBeforeDispatch) {
+      this.delayBeforeDispatch = delayBeforeDispatch;
+   }
+
+   @Override
    public byte getRoutingType() {
       return routingType;
    }
@@ -246,6 +278,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       } else {
          lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
       }
+      if (buffer.readableBytes() > 0) {
+         consumersBeforeDispatch = buffer.readInt();
+      } else {
+         consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
+      }
+      if (buffer.readableBytes() > 0) {
+         delayBeforeDispatch = buffer.readLong();
+      } else {
+         delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
+      }
    }
 
    @Override
@@ -260,6 +302,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       buffer.writeByte(routingType);
       buffer.writeBoolean(exclusive);
       buffer.writeBoolean(lastValue);
+      buffer.writeInt(consumersBeforeDispatch);
+      buffer.writeLong(delayBeforeDispatch);
    }
 
    @Override
@@ -271,7 +315,9 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          DataConstants.SIZE_BOOLEAN +
          DataConstants.SIZE_BYTE +
          DataConstants.SIZE_BOOLEAN +
-         DataConstants.SIZE_BOOLEAN;
+         DataConstants.SIZE_BOOLEAN +
+         DataConstants.SIZE_INT +
+         DataConstants.SIZE_LONG;
    }
 
    private SimpleString createMetadata() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index d95526d..ce1fcfd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -69,6 +69,8 @@ public interface PostOffice extends ActiveMQComponent {
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
+                            Integer consumersBeforeDispatch,
+                            Long delayBeforeDispatch,
                             SimpleString user) throws Exception;
 
    List<Queue> listQueuesForAddress(SimpleString address) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 21a7504..247b9ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -468,6 +468,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
+                                   Integer consumersBeforeDispatch,
+                                   Long delayBeforeDispatch,
                                    SimpleString user) throws Exception {
       synchronized (addressLock) {
          final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
@@ -512,6 +514,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             changed = true;
             queue.setExclusive(exclusive);
          }
+         if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
+            changed = true;
+            queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
+         }
+         if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) {
+            changed = true;
+            queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue());
+         }
          if (logger.isDebugEnabled()) {
             if (user == null && queue.getUser() != null) {
                logger.debug("Ignoring updating Queue to a NULL user");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index cfc5bb9..e15feb4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -338,6 +338,10 @@ public interface ActiveMQServer extends ServiceComponent {
    void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
                           SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue) throws Exception;
 
+   void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
+                          SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue,
+                          int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
+
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      boolean durable, boolean temporary) throws Exception;
 
@@ -349,6 +353,10 @@ public interface ActiveMQServer extends ServiceComponent {
                      boolean autoCreateAddress) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+                     boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive,
+                     boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+
+   Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
                      Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
 
@@ -360,6 +368,11 @@ public interface ActiveMQServer extends ServiceComponent {
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
                      Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception;
 
+   Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
+                     SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
+                     Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, Integer consumersBeforeDispatch,
+                     Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
@@ -368,6 +381,11 @@ public interface ActiveMQServer extends ServiceComponent {
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception;
 
+   Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+                     SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
+                     boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch,
+                     long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+
    @Deprecated
    Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
 
@@ -454,6 +472,15 @@ public interface ActiveMQServer extends ServiceComponent {
                      Boolean exclusive,
                      String user) throws Exception;
 
+   Queue updateQueue(String name,
+                     RoutingType routingType,
+                     Integer maxConsumers,
+                     Boolean purgeOnNoConsumers,
+                     Boolean exclusive,
+                     Integer consumersBeforeDispatch,
+                     Long delayBeforeDispatch,
+                     String user) throws Exception;
+
    /*
             * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
             * replace any factories with the same protocol

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 0e16718..70051c0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -68,6 +68,20 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void setPurgeOnNoConsumers(boolean value);
 
+   int getConsumersBeforeDispatch();
+
+   void setConsumersBeforeDispatch(int consumersBeforeDispatch);
+
+   long getDelayBeforeDispatch();
+
+   void setDelayBeforeDispatch(long delayBeforeDispatch);
+
+   long getDispatchStartTime();
+
+   boolean isDispatching();
+
+   void setDispatching(boolean dispatching);
+
    boolean isExclusive();
 
    void setExclusive(boolean value);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 75f859d..c83e08a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -40,6 +40,8 @@ public final class QueueConfig {
    private final boolean exclusive;
    private final boolean lastValue;
    private final boolean purgeOnNoConsumers;
+   private final int consumersBeforeDispatch;
+   private final long delayBeforeDispatch;
 
    public static final class Builder {
 
@@ -57,6 +59,8 @@ public final class QueueConfig {
       private boolean exclusive;
       private boolean lastValue;
       private boolean purgeOnNoConsumers;
+      private int consumersBeforeDispatch;
+      private long delayBeforeDispatch;
 
       private Builder(final long id, final SimpleString name) {
          this(id, name, name);
@@ -77,6 +81,8 @@ public final class QueueConfig {
          this.exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
          this.lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
          this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
+         this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
+         this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
          validateState();
       }
 
@@ -138,6 +144,15 @@ public final class QueueConfig {
          return this;
       }
 
+      public Builder consumersBeforeDispatch(final int consumersBeforeDispatch) {
+         this.consumersBeforeDispatch = consumersBeforeDispatch;
+         return this;
+      }
+
+      public Builder delayBeforeDispatch(final long delayBeforeDispatch) {
+         this.delayBeforeDispatch = delayBeforeDispatch;
+         return this;
+      }
 
       public Builder purgeOnNoConsumers(final boolean purgeOnNoConsumers) {
          this.purgeOnNoConsumers = purgeOnNoConsumers;
@@ -170,7 +185,7 @@ public final class QueueConfig {
          } else {
             pageSubscription = null;
          }
-         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, purgeOnNoConsumers);
+         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers);
       }
 
    }
@@ -216,6 +231,8 @@ public final class QueueConfig {
                        final int maxConsumers,
                        final boolean exclusive,
                        final boolean lastValue,
+                       final int consumersBeforeDispatch,
+                       final long delayBeforeDispatch,
                        final boolean purgeOnNoConsumers) {
       this.id = id;
       this.address = address;
@@ -231,6 +248,8 @@ public final class QueueConfig {
       this.exclusive = exclusive;
       this.lastValue = lastValue;
       this.maxConsumers = maxConsumers;
+      this.consumersBeforeDispatch = consumersBeforeDispatch;
+      this.delayBeforeDispatch = delayBeforeDispatch;
    }
 
    public long id() {
@@ -289,6 +308,14 @@ public final class QueueConfig {
       return routingType;
    }
 
+   public int consumersBeforeDispatch() {
+      return consumersBeforeDispatch;
+   }
+
+   public long delayBeforeDispatch() {
+      return delayBeforeDispatch;
+   }
+
    @Override
    public boolean equals(Object o) {
       if (this == o)
@@ -324,6 +351,12 @@ public final class QueueConfig {
          return false;
       if (purgeOnNoConsumers != that.purgeOnNoConsumers)
          return false;
+      if (consumersBeforeDispatch != that.consumersBeforeDispatch)
+         return false;
+      if (delayBeforeDispatch != that.delayBeforeDispatch)
+         return false;
+      if (purgeOnNoConsumers != that.purgeOnNoConsumers)
+         return false;
       return user != null ? user.equals(that.user) : that.user == null;
 
    }
@@ -343,6 +376,8 @@ public final class QueueConfig {
       result = 31 * result + maxConsumers;
       result = 31 * result + (exclusive ? 1 : 0);
       result = 31 * result + (lastValue ? 1 : 0);
+      result = 31 * result + consumersBeforeDispatch;
+      result = 31 * result + Long.hashCode(delayBeforeDispatch);
       result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
       return result;
    }
@@ -363,6 +398,8 @@ public final class QueueConfig {
          + ", maxConsumers=" + maxConsumers
          + ", exclusive=" + exclusive
          + ", lastValue=" + lastValue
+         + ", consumersBeforeDispatch=" + consumersBeforeDispatch
+         + ", delayBeforeDispatch=" + delayBeforeDispatch
          + ", purgeOnNoConsumers=" + purgeOnNoConsumers + '}';
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 4abda08..a2cdf55 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1663,6 +1663,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public Queue createQueue(final SimpleString address,
+                            final RoutingType routingType,
+                            final SimpleString queueName,
+                            final SimpleString filter,
+                            final boolean durable,
+                            final boolean temporary,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean exclusive,
+                            final boolean lastValue,
+                            final int consumersBeforeDispatch,
+                            final long delayBeforeDispatch,
+                            final boolean autoCreateAddress) throws Exception {
+      return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
+   }
+
+   @Override
    @Deprecated
    public Queue createQueue(SimpleString address,
                             RoutingType routingType,
@@ -1681,12 +1698,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreateAddress);
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception {
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, autoCreateAddress);
+      AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
+   }
+
+   @Override
+   public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
    }
 
 
@@ -1695,7 +1718,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
-      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreateAddress);
+      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
+   }
+
+   @Override
+   public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+                            SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
+                            boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception {
+      AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
+      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
    }
 
 
@@ -1731,6 +1762,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                  boolean purgeOnNoConsumers,
                                  boolean exclusive,
                                  boolean lastValue) throws Exception {
+      AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString());
+      createSharedQueue(address, routingType, name, filterString, user, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch());
+   }
+
+   @Override
+   public void createSharedQueue(final SimpleString address,
+                                 RoutingType routingType,
+                                 final SimpleString name,
+                                 final SimpleString filterString,
+                                 final SimpleString user,
+                                 boolean durable,
+                                 int maxConsumers,
+                                 boolean purgeOnNoConsumers,
+                                 boolean exclusive,
+                                 boolean lastValue,
+                                 int consumersBeforeDispatch,
+                                 long delayBeforeDispatch) throws Exception {
       //force the old contract about address
       if (address == null) {
          throw new NullPointerException("address can't be null!");
@@ -1744,7 +1792,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
-      final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, true);
+      final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
 
       if (!queue.getAddress().equals(address)) {
          throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@@ -2541,21 +2589,22 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          ActiveMQServerLogger.LOGGER.deployQueue(config.getName(), config.getAddress());
          AddressSettings as = addressSettingsRepository.getMatch(config.getAddress());
          // determine if there is an address::queue match; update it if so
-         int maxConsumerAddressSetting = as.getDefaultMaxConsumers();
-         int maxConsumerQueueConfig = config.getMaxConsumers();
-         int maxConsumer = (config.isMaxConsumerConfigured()) ? maxConsumerQueueConfig : maxConsumerAddressSetting;
+         int maxConsumers =  config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers();
+         boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive();
+         boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue();
+         int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch();
+         long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
+
          if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
-            updateQueue(config.getName(), config.getRoutingType(), maxConsumer, config.getPurgeOnNoConsumers(),
-                        config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(),
-                        config.getUser());
+            updateQueue(config.getName(), config.getRoutingType(), maxConsumers, config.getPurgeOnNoConsumers(),
+                    isExclusive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser());
          } else {
             // if the address::queue doesn't exist then create it
             try {
                createQueue(SimpleString.toSimpleString(config.getAddress()), config.getRoutingType(),
                            queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()),
-                           config.isDurable(),false,false,false,false,maxConsumer,config.getPurgeOnNoConsumers(),
-                           config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive(),
-                           config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue(), true);
+                           config.isDurable(),false,false,false,false, maxConsumers, config.getPurgeOnNoConsumers(),
+                           isExclusive, isLastValue, consumersBeforeDispatch, delayBeforeDispatch, true);
             } catch (ActiveMQQueueExistsException e) {
                // the queue may exist on a *different* address
                ActiveMQServerLogger.LOGGER.warn(e.getMessage());
@@ -2742,6 +2791,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final boolean purgeOnNoConsumers,
                             final boolean exclusive,
                             final boolean lastValue,
+                            final int consumersBeforeDispatch,
+                            final long delayBeforeDispatch,
                             final boolean autoCreateAddress) throws Exception {
       final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
       if (binding != null) {
@@ -2784,7 +2835,21 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(rt, info.getName().toString(), info.getRoutingTypes());
       }
 
-      final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(rt).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).exclusive(exclusive).lastValue(lastValue).build();
+      final QueueConfig queueConfig = queueConfigBuilder
+              .filter(filter)
+              .pagingManager(pagingManager)
+              .user(user)
+              .durable(durable)
+              .temporary(temporary)
+              .autoCreated(autoCreated)
+              .routingType(rt)
+              .maxConsumers(maxConsumers)
+              .purgeOnNoConsumers(purgeOnNoConsumers)
+              .exclusive(exclusive)
+              .lastValue(lastValue)
+              .consumersBeforeDispatch(consumersBeforeDispatch)
+              .delayBeforeDispatch(delayBeforeDispatch)
+              .build();
 
       callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
 
@@ -2852,6 +2917,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final boolean purgeOnNoConsumers,
                             final boolean exclusive,
                             final boolean lastValue,
+                            final int consumersBeforeDispatch,
+                            final long delayBeforeDispatch,
                             final boolean autoCreateAddress) throws Exception {
 
       final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
@@ -2893,7 +2960,20 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes());
       }
 
-      final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).exclusive(exclusive).lastValue(lastValue).build();
+      final QueueConfig queueConfig = queueConfigBuilder
+              .filter(filter)
+              .pagingManager(pagingManager)
+              .user(user)
+              .durable(durable)
+              .temporary(temporary)
+              .autoCreated(autoCreated).routingType(routingType)
+              .maxConsumers(maxConsumers)
+              .purgeOnNoConsumers(purgeOnNoConsumers)
+              .exclusive(exclusive)
+              .lastValue(lastValue)
+              .consumersBeforeDispatch(consumersBeforeDispatch)
+              .delayBeforeDispatch(delayBeforeDispatch)
+              .build();
 
       callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null);
 
@@ -2963,14 +3043,27 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null, null);
    }
 
+   @Deprecated
+   @Override
+   public Queue updateQueue(String name,
+                            RoutingType routingType,
+                            Integer maxConsumers,
+                            Boolean purgeOnNoConsumers,
+                            Boolean exclusive,
+                            String user) throws Exception {
+      return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+   }
+
    @Override
    public Queue updateQueue(String name,
                             RoutingType routingType,
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
+                            Integer consumersBeforeDispatch,
+                            Long delayBeforeDispatch,
                             String user) throws Exception {
-      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, SimpleString.toSimpleString(user));
+      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user));
       if (queueBinding != null) {
          final Queue queue = queueBinding.getQueue();
          return queue;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index fc96591..7c2ffee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -63,6 +63,8 @@ public class LastValueQueue extends QueueImpl {
                          final RoutingType routingType,
                          final Integer maxConsumers,
                          final Boolean exclusive,
+                         final Integer consumersBeforeDispatch,
+                         final Long delayBeforeDispatch,
                          final Boolean purgeOnNoConsumers,
                          final ScheduledExecutorService scheduledExecutor,
                          final PostOffice postOffice,
@@ -71,7 +73,7 @@ public class LastValueQueue extends QueueImpl {
                          final ArtemisExecutor executor,
                          final ActiveMQServer server,
                          final QueueFactory factory) {
-      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index f9ec964..59a318c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -153,6 +153,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
             .maxConsumers(queueBindingInfo.getMaxConsumers())
             .exclusive(queueBindingInfo.isExclusive())
             .lastValue(queueBindingInfo.isLastValue())
+            .consumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch())
+            .delayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch())
             .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()));
          final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
          queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 7f23d09..24b36e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -74,9 +74,9 @@ public class QueueFactoryImpl implements QueueFactory {
    public Queue createQueueWith(final QueueConfig config) {
       final Queue queue;
       if (config.isLastValue()) {
-         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       }
 
       server.getCriticalAnalyzer().add(queue);
@@ -102,7 +102,7 @@ public class QueueFactoryImpl implements QueueFactory {
 
       Queue queue;
       if (addressSettings.isDefaultLastValueQueue()) {
-         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
+         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(),  scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       } else {
          queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8a9835a3/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index bc5c0c9..2656217 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -37,8 +37,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
 
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNullRefException;
@@ -87,6 +89,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.BooleanUtil;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.ReusableLatch;
@@ -113,6 +116,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    protected static final int CRITICAL_CONSUMER = 3;
 
    private static final Logger logger = Logger.getLogger(QueueImpl.class);
+   private static final AtomicIntegerFieldUpdater dispatchingUpdater = AtomicIntegerFieldUpdater.newUpdater(QueueImpl.class, "dispatching");
+   private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime");
 
    public static final int REDISTRIBUTOR_BATCH_SIZE = 100;
 
@@ -216,7 +221,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final SimpleString address;
 
-   private Redistributor redistributor;
+   private ConsumerHolder<Redistributor> redistributor;
 
    private ScheduledFuture<?> redistributorFuture;
 
@@ -268,6 +273,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final QueueFactory factory;
 
+   public volatile int dispatching = 0;
+
+   public volatile long dispatchStartTime = -1;
+
+   private int consumersBeforeDispatch = 0;
+
+   private long delayBeforeDispatch = 0;
+
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -413,6 +427,31 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                     final ArtemisExecutor executor,
                     final ActiveMQServer server,
                     final QueueFactory factory) {
+      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+   }
+
+   public QueueImpl(final long id,
+                    final SimpleString address,
+                    final SimpleString name,
+                    final Filter filter,
+                    final PageSubscription pageSubscription,
+                    final SimpleString user,
+                    final boolean durable,
+                    final boolean temporary,
+                    final boolean autoCreated,
+                    final RoutingType routingType,
+                    final Integer maxConsumers,
+                    final Boolean exclusive,
+                    final Integer consumersBeforeDispatch,
+                    final Long delayBeforeDispatch,
+                    final Boolean purgeOnNoConsumers,
+                    final ScheduledExecutorService scheduledExecutor,
+                    final PostOffice postOffice,
+                    final StorageManager storageManager,
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final ArtemisExecutor executor,
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
       super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
 
       this.id = id;
@@ -441,6 +480,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.purgeOnNoConsumers = purgeOnNoConsumers == null ? ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers() : purgeOnNoConsumers;
 
+      this.consumersBeforeDispatch = consumersBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch() : consumersBeforeDispatch;
+
+      this.delayBeforeDispatch = delayBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : delayBeforeDispatch;
+
       this.postOffice = postOffice;
 
       this.storageManager = storageManager;
@@ -505,6 +548,48 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public int getConsumersBeforeDispatch() {
+      return consumersBeforeDispatch;
+   }
+
+   @Override
+   public synchronized void setConsumersBeforeDispatch(int consumersBeforeDispatch) {
+      this.consumersBeforeDispatch = consumersBeforeDispatch;
+   }
+
+   @Override
+   public long getDelayBeforeDispatch() {
+      return delayBeforeDispatch;
+   }
+
+   @Override
+   public synchronized void setDelayBeforeDispatch(long delayBeforeDispatch) {
+      this.delayBeforeDispatch = delayBeforeDispatch;
+   }
+
+   @Override
+   public long getDispatchStartTime() {
+      return dispatchStartTimeUpdater.get(this);
+   }
+
+   @Override
+   public boolean isDispatching() {
+      return BooleanUtil.toBoolean(dispatchingUpdater.get(this));
+   }
+
+   @Override
+   public synchronized void setDispatching(boolean dispatching) {
+      if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(!dispatching), BooleanUtil.toInt(dispatching))) {
+         if (dispatching) {
+            dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
+         } else {
+            dispatchStartTimeUpdater.set(this, -1);
+         }
+      }
+   }
+
+
+   @Override
    public boolean isLastValue() {
       return false;
    }
@@ -867,6 +952,21 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   private boolean canDispatch() {
+      boolean canDispatch = BooleanUtil.toBoolean(dispatchingUpdater.get(this));
+      if (canDispatch) {
+         return true;
+      } else {
+         long currentDispatchStartTime = dispatchStartTimeUpdater.get(this);
+         if (currentDispatchStartTime != -1 && currentDispatchStartTime < System.currentTimeMillis()) {
+            dispatchingUpdater.set(this, BooleanUtil.toInt(true));
+            return true;
+         } else {
+            return false;
+         }
+      }
+   }
+
    @Override
    public void addConsumer(final Consumer consumer) throws Exception {
       if (logger.isDebugEnabled()) {
@@ -876,7 +976,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_CONSUMER);
       try {
          synchronized (this) {
-
             if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumersCount.get() >= maxConsumers) {
                throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
             }
@@ -892,7 +991,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             consumerList.add(new ConsumerHolder(consumer));
 
             if (consumerSet.add(consumer)) {
-               consumersCount.incrementAndGet();
+               int currentConsumerCount = consumersCount.incrementAndGet();
+               if (delayBeforeDispatch >= 0) {
+                  dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
+               }
+               if (currentConsumerCount >= consumersBeforeDispatch) {
+                  if (dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
+                     dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
+                  }
+               }
             }
 
             if (refCountForConsumers != null) {
@@ -931,7 +1038,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
 
             if (consumerSet.remove(consumer)) {
-               consumersCount.decrementAndGet();
+               int currentConsumerCount = consumersCount.decrementAndGet();
+               boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0));
+               if (stopped) {
+                  dispatchStartTimeUpdater.set(this, -1);
+               }
             }
 
             LinkedList<SimpleString> groupsToRemove = null;
@@ -1005,11 +1116,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    @Override
    public synchronized void cancelRedistributor() throws Exception {
       if (redistributor != null) {
-         redistributor.stop();
-         Redistributor redistributorToRemove = redistributor;
+         redistributor.consumer.stop();
          redistributor = null;
-
-         removeConsumer(redistributorToRemove);
       }
 
       clearRedistributorFuture();
@@ -2265,7 +2373,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          synchronized (this) {
 
             // Need to do these checks inside the synchronized
-            if (paused || consumerList.isEmpty()) {
+            if (paused || !canDispatch() && redistributor == null) {
                return;
             }
 
@@ -2273,20 +2381,26 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                break;
             }
 
-            if (endPos < 0 || consumersChanged) {
-               consumersChanged = false;
+            ConsumerHolder<? extends Consumer> holder;
+            if (redistributor == null) {
+
+               if (endPos < 0 || consumersChanged) {
+                  consumersChanged = false;
 
-               size = consumerList.size();
+                  size = consumerList.size();
 
-               endPos = pos - 1;
+                  endPos = pos - 1;
 
-               if (endPos < 0) {
-                  endPos = size - 1;
-                  noDelivery = 0;
+                  if (endPos < 0) {
+                     endPos = size - 1;
+                     noDelivery = 0;
+                  }
                }
-            }
 
-            ConsumerHolder holder = consumerList.get(pos);
+               holder = consumerList.get(pos);
+            } else {
+               holder = redistributor;
+            }
 
             Consumer consumer = holder.consumer;
             Consumer groupConsumer = null;
@@ -2332,7 +2446,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   }
                }
 
-               if (exclusive) {
+               if (exclusive && redistributor == null) {
                   consumer = consumerList.get(0).consumer;
                }
 
@@ -2522,13 +2636,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          if (logger.isTraceEnabled()) {
             logger.trace("QueueImpl::Adding redistributor on queue " + this.toString());
          }
-         redistributor = new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE);
 
-         consumerList.add(new ConsumerHolder(redistributor));
+         redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
 
          consumersChanged = true;
 
-         redistributor.start();
+         redistributor.consumer.start();
 
          deliverAsync();
       }
@@ -2864,7 +2977,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             // this would protect any eventual bug
             return false;
          }
-         if (paused || consumerList.isEmpty()) {
+         if (paused || !canDispatch() && redistributor == null) {
             return false;
          }
 
@@ -2877,7 +2990,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          int size = consumerList.size();
 
          while (true) {
-            ConsumerHolder holder = consumerList.get(pos);
+            ConsumerHolder<? extends Consumer> holder;
+            if (redistributor == null) {
+               holder = consumerList.get(pos);
+            } else {
+               holder = redistributor;
+            }
 
             Consumer consumer = holder.consumer;
 
@@ -2895,7 +3013,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                }
             }
 
-            if (exclusive) {
+            if (exclusive && redistributor == null) {
                consumer = consumerList.get(0).consumer;
             }
 
@@ -3158,13 +3276,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // Inner classes
    // --------------------------------------------------------------------------
 
-   private static class ConsumerHolder {
+   private static class ConsumerHolder<T extends  Consumer> {
 
-      ConsumerHolder(final Consumer consumer) {
+      ConsumerHolder(final T consumer) {
          this.consumer = consumer;
       }
 
-      final Consumer consumer;
+      final T consumer;
 
       LinkedListIterator<MessageReference> iter;