You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/07/31 18:16:21 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1987 - Add consumer window size to AddressSettings

ARTEMIS-1987 - Add consumer window size to AddressSettings

Support configuring a default consumer window size via AddressSettings
which will allow sensible defaults to be used by address type


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5fc60d74
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5fc60d74
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5fc60d74

Branch: refs/heads/master
Commit: 5fc60d7437dcc5aea217103e516e50e31fe7a467
Parents: 587e455
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 25 06:42:29 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 31 14:16:11 2018 -0400

----------------------------------------------------------------------
 .../artemis/api/core/client/ClientSession.java  |  2 +
 .../core/client/impl/QueueQueryImpl.java        | 14 ++++-
 .../core/impl/ActiveMQSessionContext.java       | 34 ++++++++---
 .../SessionQueueQueryResponseMessage_V3.java    | 32 ++++++++--
 .../artemis/core/server/QueueQueryResult.java   | 11 +++-
 .../spi/core/remoting/SessionContext.java       |  3 +
 .../deployers/impl/FileConfigurationParser.java |  4 ++
 .../artemis/core/server/ServerSession.java      |  2 +
 .../core/server/impl/ActiveMQServerImpl.java    | 21 ++++---
 .../core/server/impl/ServerSessionImpl.java     |  6 ++
 .../core/settings/impl/AddressSettings.java     | 40 +++++++++++-
 .../resources/schema/artemis-configuration.xsd  |  8 +++
 .../core/config/impl/FileConfigurationTest.java |  1 +
 .../resources/ConfigurationTest-full-config.xml |  1 +
 ...ionTest-xinclude-config-address-settings.xml |  1 +
 docs/user-manual/en/address-model.md            |  5 ++
 docs/user-manual/en/flow-control.md             |  2 +-
 .../client/ConsumerWindowSizeTest.java          | 64 ++++++++++++++++++++
 18 files changed, 222 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index dbc1e89..414def3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -147,6 +147,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
       Boolean isExclusive();
 
       Boolean isLastValue();
+
+      Integer getDefaultConsumerWindowSize();
    }
 
    // Lifecycle operations ------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 8dc35a9..d377d18 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -16,9 +16,9 @@
  */
 package org.apache.activemq.artemis.core.client.impl;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.RoutingType;
 
 public class QueueQueryImpl implements ClientSession.QueueQuery {
 
@@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
 
    private final Boolean lastValue;
 
+   private final Integer defaultConsumerWindowSize;
+
    public QueueQueryImpl(final boolean durable,
                          final boolean temporary,
                          final int consumerCount,
@@ -88,7 +90,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final boolean autoCreated,
                          final boolean purgeOnNoConsumers,
                          final RoutingType routingType) {
-      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null);
+      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null, null);
    }
    public QueueQueryImpl(final boolean durable,
                          final boolean temporary,
@@ -104,7 +106,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final boolean purgeOnNoConsumers,
                          final RoutingType routingType,
                          final Boolean exclusive,
-                         final Boolean lastValue) {
+                         final Boolean lastValue,
+                         final Integer defaultConsumerWindowSize) {
       this.durable = durable;
       this.temporary = temporary;
       this.consumerCount = consumerCount;
@@ -120,6 +123,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
       this.routingType = routingType;
       this.exclusive = exclusive;
       this.lastValue = lastValue;
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
    }
 
    @Override
@@ -197,5 +201,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
       return lastValue;
    }
 
+   @Override
+   public Integer getDefaultConsumerWindowSize() {
+      return defaultConsumerWindowSize;
+   }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index fccb041..1268bba 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -16,9 +16,12 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
+
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.util.EnumSet;
@@ -29,6 +32,10 @@ import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -37,6 +44,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
@@ -85,6 +93,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@@ -115,12 +124,6 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
 import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
 import org.jboss.logging.Logger;
 
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
-
 public class ActiveMQSessionContext extends SessionContext {
 
    private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
@@ -324,8 +327,9 @@ public class ActiveMQSessionContext extends SessionContext {
       // The actual windows size that gets used is determined by the user since
       // could be overridden on the queue settings
       // The value we send is just a hint
+      final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
 
-      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+      return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
    }
 
    @Override
@@ -822,6 +826,16 @@ public class ActiveMQSessionContext extends SessionContext {
       }
    }
 
+   @Override
+   public int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException {
+      if (response instanceof SessionQueueQueryResponseMessage_V3) {
+         final Integer defaultConsumerWindowSize = ((SessionQueueQueryResponseMessage_V3) response).getDefaultConsumerWindowSize();
+         return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+      } else {
+         return ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+      }
+   }
+
    private Channel getCreateChannel() {
       return getCoreConnection().getChannel(1, -1);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index b982744..0c4c40f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -38,12 +38,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    protected Boolean lastValue;
 
+   protected Integer defaultConsumerWindowSize;
+
    public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
-      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue());
+      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue(), result.getDefaultConsumerWindowSize());
    }
 
    public SessionQueueQueryResponseMessage_V3() {
-      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null);
+      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null, null);
    }
 
    private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -60,7 +62,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
                                                final RoutingType routingType,
                                                final int maxConsumers,
                                                final Boolean exclusive,
-                                               final Boolean lastValue) {
+                                               final Boolean lastValue,
+                                               final Integer defaultConsumerWindowSize) {
       super(SESS_QUEUEQUERY_RESP_V3);
 
       this.durable = durable;
@@ -92,6 +95,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       this.exclusive = exclusive;
 
       this.lastValue = lastValue;
+
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
    }
 
    public boolean isAutoCreated() {
@@ -142,6 +147,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       this.lastValue = lastValue;
    }
 
+   public Integer getDefaultConsumerWindowSize() {
+      return defaultConsumerWindowSize;
+   }
+
+   public void setDefaultConsumerWindowSize(Integer defaultConsumerWindowSize) {
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -151,6 +164,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buffer.writeInt(maxConsumers);
       BufferHelper.writeNullableBoolean(buffer, exclusive);
       BufferHelper.writeNullableBoolean(buffer, lastValue);
+      BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
    }
 
    @Override
@@ -164,6 +178,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
          exclusive = BufferHelper.readNullableBoolean(buffer);
          lastValue = BufferHelper.readNullableBoolean(buffer);
       }
+      if (buffer.readableBytes() > 0) {
+         defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
+      }
    }
 
    @Override
@@ -176,6 +193,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       result = prime * result + maxConsumers;
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
+      result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
       return result;
    }
 
@@ -195,12 +213,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buff.append(", maxConsumers=" + maxConsumers);
       buff.append(", exclusive=" + exclusive);
       buff.append(", lastValue=" + lastValue);
+      buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
       return buff.toString();
    }
 
    @Override
    public ClientSession.QueueQuery toQueueQuery() {
-      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue());
+      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue(), getDefaultConsumerWindowSize());
    }
 
    @Override
@@ -226,6 +245,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
             return false;
       } else if (!lastValue.equals(other.lastValue))
          return false;
+      if (defaultConsumerWindowSize == null) {
+         if (other.defaultConsumerWindowSize != null)
+            return false;
+      } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
+         return false;
       if (routingType == null) {
          if (other.routingType != null)
             return false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index 6497e3f..b09d310 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -51,6 +51,8 @@ public class QueueQueryResult {
 
    private Boolean lastValue;
 
+   private Integer defaultConsumerWindowSize;
+
    public QueueQueryResult(final SimpleString name,
                            final SimpleString address,
                            final boolean durable,
@@ -65,7 +67,8 @@ public class QueueQueryResult {
                            final RoutingType routingType,
                            final int maxConsumers,
                            final Boolean exclusive,
-                           final Boolean lastValue) {
+                           final Boolean lastValue,
+                           final Integer defaultConsumerWindowSize) {
       this.durable = durable;
 
       this.temporary = temporary;
@@ -95,6 +98,8 @@ public class QueueQueryResult {
       this.exclusive = exclusive;
 
       this.lastValue = lastValue;
+
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
    }
 
    public boolean isExists() {
@@ -160,4 +165,8 @@ public class QueueQueryResult {
    public Boolean isLastValue() {
       return lastValue;
    }
+
+   public Integer getDefaultConsumerWindowSize() {
+      return defaultConsumerWindowSize;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 6571335..0d86354 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
 import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -325,6 +326,8 @@ public abstract class SessionContext {
 
    public abstract void resetMetadata(HashMap<String, String> metaDataToSend);
 
+   public abstract int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException;
+
    // Failover utility classes
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6c6a94c..dc5ba16 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -241,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
 
+   private static final String DEFAULT_CONSUMER_WINDOW_SIZE = "default-consumer-window-size";
+
 
    // Attributes ----------------------------------------------------
 
@@ -1068,6 +1070,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value);
             RoutingType routingType = RoutingType.valueOf(value);
             addressSettings.setDefaultAddressRoutingType(routingType);
+         } else if (DEFAULT_CONSUMER_WINDOW_SIZE.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
          }
       }
       return setting;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 8c2f748..6d72088 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -358,4 +358,6 @@ public interface ServerSession extends SecurityAuth {
    int getConsumerCount();
 
    int getProducerCount();
+
+   int getDefaultConsumerWindowSize(SimpleString address);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index a2cdf55..f01e0c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -865,11 +865,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
       }
 
-      boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
-      boolean defaultPurgeOnNoConsumers = getAddressSettingsRepository().getMatch(name.toString()).isDefaultPurgeOnNoConsumers();
-      int defaultMaxConsumers = getAddressSettingsRepository().getMatch(name.toString()).getDefaultMaxConsumers();
-      boolean defaultExclusiveQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultExclusiveQueue();
-      boolean defaultLastValueQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultLastValueQueue();
+      final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
+
+      boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
+      boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
+      int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
+      boolean defaultExclusiveQueue = addressSettings.isDefaultExclusiveQueue();
+      boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
+      int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
 
       QueueQueryResult response;
 
@@ -884,14 +887,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue());
+         response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), defaultConsumerWindowSize);
       } else if (name.equals(managementAddress)) {
          // make an exception for the management address (see HORNETQ-29)
-         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false);
+         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, defaultConsumerWindowSize);
       } else if (autoCreateQueues) {
-         response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue);
+         response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultConsumerWindowSize);
       } else {
-         response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null);
+         response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null, defaultConsumerWindowSize);
       }
 
       return response;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 7388e56..c0bca6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1904,4 +1904,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    public int getProducerCount() {
       return getServerProducers().size();
    }
+
+   @Override
+   public int getDefaultConsumerWindowSize(SimpleString address) {
+      AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
+      return as.getDefaultConsumerWindowSize();
+   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 6fc5019..f2eb488 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.settings.Mergeable;
@@ -178,6 +179,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private RoutingType defaultAddressRoutingType = null;
 
+   private Integer defaultConsumerWindowSize = null;
+
    //from amq5
    //make it transient
    private transient Integer queuePrefetch = null;
@@ -222,6 +225,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
       this.defaultQueueRoutingType = other.defaultQueueRoutingType;
       this.defaultAddressRoutingType = other.defaultAddressRoutingType;
+      this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
    }
 
    public AddressSettings() {
@@ -580,6 +584,21 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
    }
 
    /**
+    * @return the defaultConsumerWindowSize
+    */
+   public int getDefaultConsumerWindowSize() {
+      return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+   }
+
+   /**
+    * @param defaultConsumerWindowSize the defaultConsumerWindowSize to set
+    */
+   public AddressSettings setDefaultConsumerWindowSize(int defaultConsumerWindowSize) {
+      this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+      return this;
+   }
+
+   /**
     * merge 2 objects in to 1
     *
     * @param merged
@@ -694,6 +713,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (defaultExclusiveQueue == null) {
          defaultExclusiveQueue = merged.defaultExclusiveQueue;
       }
+      if (defaultConsumerWindowSize == null) {
+         defaultConsumerWindowSize = merged.defaultConsumerWindowSize;
+      }
       if (defaultLastValueQueue == null) {
          defaultLastValueQueue = merged.defaultLastValueQueue;
       }
@@ -811,6 +833,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
+      }
    }
 
    @Override
@@ -851,7 +877,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          DataConstants.SIZE_BYTE +
          BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
          BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
-         BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch);
+         BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch) +
+         BufferHelper.sizeOfNullableInteger(defaultConsumerWindowSize);
    }
 
    @Override
@@ -932,6 +959,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
 
+      BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
+
    }
 
    /* (non-Javadoc)
@@ -980,6 +1009,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
       result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
       result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
+      result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
       return result;
    }
 
@@ -1197,6 +1227,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
             return false;
       } else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
          return false;
+
+      if (defaultConsumerWindowSize == null) {
+         if (other.defaultConsumerWindowSize != null)
+            return false;
+      } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
+         return false;
       return true;
    }
 
@@ -1280,6 +1316,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultConsumersBeforeDispatch +
          ", defaultDelayBeforeDispatch=" +
          defaultDelayBeforeDispatch +
+         ", defaultClientWindowSize=" +
+         defaultConsumerWindowSize +
          "]";
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index e96923d..04e2931 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3011,6 +3011,14 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-consumer-window-size" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default window size for a consumer
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+            
          </xsd:all>
 
          <xsd:attribute name="match" type="xsd:string" use="required">

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 8fcac20..114779d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -346,6 +346,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers());
       assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
       assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
+      assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
 
       assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
       assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index eb561fc..186a712 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -337,6 +337,7 @@
             <default-max-consumers>15</default-max-consumers>
             <default-queue-routing-type>MULTICAST</default-queue-routing-type>
             <default-address-routing-type>ANYCAST</default-address-routing-type>
+            <default-consumer-window-size>10000</default-consumer-window-size>
          </address-setting>
       </address-settings>
       <resource-limit-settings>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 9e1ca40..443958e 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -62,5 +62,6 @@
       <default-max-consumers>15</default-max-consumers>
       <default-queue-routing-type>MULTICAST</default-queue-routing-type>
       <default-address-routing-type>ANYCAST</default-address-routing-type>
+      <default-consumer-window-size>10000</default-consumer-window-size>
    </address-setting>
 </address-settings>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/docs/user-manual/en/address-model.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index f73adf2..ac5e49b 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -790,3 +790,8 @@ types](#routing-type).
 address if the broker is unable to determine the routing-type based on the
 client and/or protocol semantics. Default is `MULTICAST`. Read more about
 [routing types](#routing-type).
+
+`default-consumer-window-size` defines the default `consumerWindowSize` value 
+for a `CORE` protocol consumer, if not defined the default will be set to 
+1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size
+if the value is not set on the client. Read more about [flow control](#flow-control).

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/docs/user-manual/en/flow-control.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md
index 44fbee3..0151079 100644
--- a/docs/user-manual/en/flow-control.md
+++ b/docs/user-manual/en/flow-control.md
@@ -33,7 +33,7 @@ buffered on each consumer is determined by the `consumerWindowSize`
 parameter.
 
 By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024
-bytes).
+bytes) unless overridden via ([Address Settings](address-model.md#configuring-addresses-and-queues-via-address-settings))
 
 The value can be:
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
index d3adee0..d4298e5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
@@ -24,7 +24,9 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -32,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -1386,4 +1389,65 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testDefaultConsumerWindowSize() throws Exception {
+      ActiveMQServer messagingService = createServer(false, isNetty());
+
+      messagingService.start();
+      messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
+
+      ClientSessionFactory cf = createSessionFactory(locator);
+      ClientSession session = cf.createSession(false, true, true);
+      ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
+
+      consumer.start();
+
+      assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE / 2, consumer.getClientWindowSize());
+   }
+
+   @Test
+   public void testConsumerWindowSizeAddressSettings() throws Exception {
+      ActiveMQServer messagingService = createServer(false, isNetty());
+
+      final int defaultConsumerWindowSize = 1024 * 5;
+      final AddressSettings settings = new AddressSettings();
+      settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
+      messagingService.getConfiguration()
+            .getAddressesSettings().put(queueA.toString(), settings);
+
+      messagingService.start();
+      messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
+
+      ClientSessionFactory cf = createSessionFactory(locator);
+      ClientSession session = cf.createSession(false, true, true);
+      ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
+
+      session.start();
+
+      assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
+   }
+
+   @Test
+   public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
+      ActiveMQServer messagingService = createServer(false, isNetty());
+
+      final int defaultConsumerWindowSize = 1024 * 5;
+      final AddressSettings settings = new AddressSettings();
+      settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
+      messagingService.getConfiguration()
+            .getAddressesSettings().put("#", settings);
+
+      messagingService.start();
+      messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
+
+      ClientSessionFactory cf = createSessionFactory(locator);
+      ClientSession session = cf.createSession(false, true, true);
+      ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
+      ClientConsumerImpl consumer2 = (ClientConsumerImpl) session.createConsumer(queueA);
+
+      session.start();
+
+      assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
+      assertEquals(defaultConsumerWindowSize / 2, consumer2.getClientWindowSize());
+   }
 }