You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/07/31 18:16:21 UTC
[2/2] activemq-artemis git commit: ARTEMIS-1987 - Add consumer window
size to AddressSettings
ARTEMIS-1987 - Add consumer window size to AddressSettings
Support configuring a default consumer window size via AddressSettings
which will allow sensible defaults to be used by address type
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5fc60d74
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5fc60d74
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5fc60d74
Branch: refs/heads/master
Commit: 5fc60d7437dcc5aea217103e516e50e31fe7a467
Parents: 587e455
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Wed Jul 25 06:42:29 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Jul 31 14:16:11 2018 -0400
----------------------------------------------------------------------
.../artemis/api/core/client/ClientSession.java | 2 +
.../core/client/impl/QueueQueryImpl.java | 14 ++++-
.../core/impl/ActiveMQSessionContext.java | 34 ++++++++---
.../SessionQueueQueryResponseMessage_V3.java | 32 ++++++++--
.../artemis/core/server/QueueQueryResult.java | 11 +++-
.../spi/core/remoting/SessionContext.java | 3 +
.../deployers/impl/FileConfigurationParser.java | 4 ++
.../artemis/core/server/ServerSession.java | 2 +
.../core/server/impl/ActiveMQServerImpl.java | 21 ++++---
.../core/server/impl/ServerSessionImpl.java | 6 ++
.../core/settings/impl/AddressSettings.java | 40 +++++++++++-
.../resources/schema/artemis-configuration.xsd | 8 +++
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
...ionTest-xinclude-config-address-settings.xml | 1 +
docs/user-manual/en/address-model.md | 5 ++
docs/user-manual/en/flow-control.md | 2 +-
.../client/ConsumerWindowSizeTest.java | 64 ++++++++++++++++++++
18 files changed, 222 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index dbc1e89..414def3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -147,6 +147,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
Boolean isExclusive();
Boolean isLastValue();
+
+ Integer getDefaultConsumerWindowSize();
}
// Lifecycle operations ------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 8dc35a9..d377d18 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -16,9 +16,9 @@
*/
package org.apache.activemq.artemis.core.client.impl;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.RoutingType;
public class QueueQueryImpl implements ClientSession.QueueQuery {
@@ -52,6 +52,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
private final Boolean lastValue;
+ private final Integer defaultConsumerWindowSize;
+
public QueueQueryImpl(final boolean durable,
final boolean temporary,
final int consumerCount,
@@ -88,7 +90,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final boolean autoCreated,
final boolean purgeOnNoConsumers,
final RoutingType routingType) {
- this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null);
+ this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, null, null, null);
}
public QueueQueryImpl(final boolean durable,
final boolean temporary,
@@ -104,7 +106,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
final boolean purgeOnNoConsumers,
final RoutingType routingType,
final Boolean exclusive,
- final Boolean lastValue) {
+ final Boolean lastValue,
+ final Integer defaultConsumerWindowSize) {
this.durable = durable;
this.temporary = temporary;
this.consumerCount = consumerCount;
@@ -120,6 +123,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
this.routingType = routingType;
this.exclusive = exclusive;
this.lastValue = lastValue;
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@Override
@@ -197,5 +201,9 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
return lastValue;
}
+ @Override
+ public Integer getDefaultConsumerWindowSize() {
+ return defaultConsumerWindowSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index fccb041..1268bba 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -16,9 +16,12 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
+
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.EnumSet;
@@ -29,6 +32,10 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -37,6 +44,7 @@ import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
@@ -85,6 +93,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionPro
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage_V3;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
@@ -115,12 +124,6 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
import org.jboss.logging.Logger;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.DISCONNECT_CONSUMER;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.EXCEPTION;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
-import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
-
public class ActiveMQSessionContext extends SessionContext {
private static final Logger logger = Logger.getLogger(ActiveMQSessionContext.class);
@@ -324,8 +327,9 @@ public class ActiveMQSessionContext extends SessionContext {
// The actual windows size that gets used is determined by the user since
// could be overridden on the queue settings
// The value we send is just a hint
+ final int consumerWindowSize = windowSize == ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE ? this.getDefaultConsumerWindowSize(queueInfo) : windowSize;
- return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(windowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
+ return new ClientConsumerImpl(session, consumerContext, queueName, filterString, browseOnly, calcWindowSize(consumerWindowSize), ackBatchSize, maxRate > 0 ? new TokenBucketLimiterImpl(maxRate, false) : null, executor, flowControlExecutor, this, queueInfo.toQueueQuery(), lookupTCCL());
}
@Override
@@ -822,6 +826,16 @@ public class ActiveMQSessionContext extends SessionContext {
}
}
+ @Override
+ public int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException {
+ if (response instanceof SessionQueueQueryResponseMessage_V3) {
+ final Integer defaultConsumerWindowSize = ((SessionQueueQueryResponseMessage_V3) response).getDefaultConsumerWindowSize();
+ return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+ } else {
+ return ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+ }
+ }
+
private Channel getCreateChannel() {
return getCoreConnection().getChannel(1, -1);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index b982744..0c4c40f 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -38,12 +38,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
protected Boolean lastValue;
+ protected Integer defaultConsumerWindowSize;
+
public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
- this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue());
+ this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue(), result.getDefaultConsumerWindowSize());
}
public SessionQueueQueryResponseMessage_V3() {
- this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null);
+ this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null, null);
}
private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -60,7 +62,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
final RoutingType routingType,
final int maxConsumers,
final Boolean exclusive,
- final Boolean lastValue) {
+ final Boolean lastValue,
+ final Integer defaultConsumerWindowSize) {
super(SESS_QUEUEQUERY_RESP_V3);
this.durable = durable;
@@ -92,6 +95,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.exclusive = exclusive;
this.lastValue = lastValue;
+
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
public boolean isAutoCreated() {
@@ -142,6 +147,14 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
this.lastValue = lastValue;
}
+ public Integer getDefaultConsumerWindowSize() {
+ return defaultConsumerWindowSize;
+ }
+
+ public void setDefaultConsumerWindowSize(Integer defaultConsumerWindowSize) {
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+ }
+
@Override
public void encodeRest(final ActiveMQBuffer buffer) {
super.encodeRest(buffer);
@@ -151,6 +164,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buffer.writeInt(maxConsumers);
BufferHelper.writeNullableBoolean(buffer, exclusive);
BufferHelper.writeNullableBoolean(buffer, lastValue);
+ BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
}
@Override
@@ -164,6 +178,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
exclusive = BufferHelper.readNullableBoolean(buffer);
lastValue = BufferHelper.readNullableBoolean(buffer);
}
+ if (buffer.readableBytes() > 0) {
+ defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
+ }
}
@Override
@@ -176,6 +193,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
result = prime * result + maxConsumers;
result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
+ result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
return result;
}
@@ -195,12 +213,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
buff.append(", maxConsumers=" + maxConsumers);
buff.append(", exclusive=" + exclusive);
buff.append(", lastValue=" + lastValue);
+ buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
return buff.toString();
}
@Override
public ClientSession.QueueQuery toQueueQuery() {
- return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue());
+ return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue(), getDefaultConsumerWindowSize());
}
@Override
@@ -226,6 +245,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
return false;
} else if (!lastValue.equals(other.lastValue))
return false;
+ if (defaultConsumerWindowSize == null) {
+ if (other.defaultConsumerWindowSize != null)
+ return false;
+ } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
+ return false;
if (routingType == null) {
if (other.routingType != null)
return false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index 6497e3f..b09d310 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -51,6 +51,8 @@ public class QueueQueryResult {
private Boolean lastValue;
+ private Integer defaultConsumerWindowSize;
+
public QueueQueryResult(final SimpleString name,
final SimpleString address,
final boolean durable,
@@ -65,7 +67,8 @@ public class QueueQueryResult {
final RoutingType routingType,
final int maxConsumers,
final Boolean exclusive,
- final Boolean lastValue) {
+ final Boolean lastValue,
+ final Integer defaultConsumerWindowSize) {
this.durable = durable;
this.temporary = temporary;
@@ -95,6 +98,8 @@ public class QueueQueryResult {
this.exclusive = exclusive;
this.lastValue = lastValue;
+
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
public boolean isExists() {
@@ -160,4 +165,8 @@ public class QueueQueryResult {
public Boolean isLastValue() {
return lastValue;
}
+
+ public Integer getDefaultConsumerWindowSize() {
+ return defaultConsumerWindowSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 6571335..0d86354 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal;
import org.apache.activemq.artemis.core.client.impl.ClientProducerCredits;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@@ -325,6 +326,8 @@ public abstract class SessionContext {
public abstract void resetMetadata(HashMap<String, String> metaDataToSend);
+ public abstract int getDefaultConsumerWindowSize(SessionQueueQueryResponseMessage response) throws ActiveMQException;
+
// Failover utility classes
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6c6a94c..dc5ba16 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -241,6 +241,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
+ private static final String DEFAULT_CONSUMER_WINDOW_SIZE = "default-consumer-window-size";
+
// Attributes ----------------------------------------------------
@@ -1068,6 +1070,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
Validators.ROUTING_TYPE.validate(DEFAULT_ADDRESS_ROUTING_TYPE, value);
RoutingType routingType = RoutingType.valueOf(value);
addressSettings.setDefaultAddressRoutingType(routingType);
+ } else if (DEFAULT_CONSUMER_WINDOW_SIZE.equalsIgnoreCase(name)) {
+ addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
}
}
return setting;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 8c2f748..6d72088 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -358,4 +358,6 @@ public interface ServerSession extends SecurityAuth {
int getConsumerCount();
int getProducerCount();
+
+ int getDefaultConsumerWindowSize(SimpleString address);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index a2cdf55..f01e0c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -865,11 +865,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
}
- boolean autoCreateQueues = getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateQueues();
- boolean defaultPurgeOnNoConsumers = getAddressSettingsRepository().getMatch(name.toString()).isDefaultPurgeOnNoConsumers();
- int defaultMaxConsumers = getAddressSettingsRepository().getMatch(name.toString()).getDefaultMaxConsumers();
- boolean defaultExclusiveQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultExclusiveQueue();
- boolean defaultLastValueQueue = getAddressSettingsRepository().getMatch(name.toString()).isDefaultLastValueQueue();
+ final AddressSettings addressSettings = getAddressSettingsRepository().getMatch(name.toString());
+
+ boolean autoCreateQueues = addressSettings.isAutoCreateQueues();
+ boolean defaultPurgeOnNoConsumers = addressSettings.isDefaultPurgeOnNoConsumers();
+ int defaultMaxConsumers = addressSettings.getDefaultMaxConsumers();
+ boolean defaultExclusiveQueue = addressSettings.isDefaultExclusiveQueue();
+ boolean defaultLastValueQueue = addressSettings.isDefaultLastValueQueue();
+ int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
QueueQueryResult response;
@@ -884,14 +887,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
SimpleString filterString = filter == null ? null : filter.getFilterString();
- response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue());
+ response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), defaultConsumerWindowSize);
} else if (name.equals(managementAddress)) {
// make an exception for the management address (see HORNETQ-29)
- response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false);
+ response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, defaultConsumerWindowSize);
} else if (autoCreateQueues) {
- response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue);
+ response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultConsumerWindowSize);
} else {
- response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null);
+ response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, 0, null, null, defaultConsumerWindowSize);
}
return response;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 7388e56..c0bca6b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1904,4 +1904,10 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
public int getProducerCount() {
return getServerProducers().size();
}
+
+ @Override
+ public int getDefaultConsumerWindowSize(SimpleString address) {
+ AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
+ return as.getDefaultConsumerWindowSize();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 6fc5019..f2eb488 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.settings.Mergeable;
@@ -178,6 +179,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private RoutingType defaultAddressRoutingType = null;
+ private Integer defaultConsumerWindowSize = null;
+
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@@ -222,6 +225,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch;
this.defaultQueueRoutingType = other.defaultQueueRoutingType;
this.defaultAddressRoutingType = other.defaultAddressRoutingType;
+ this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
}
public AddressSettings() {
@@ -580,6 +584,21 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
/**
+ * @return the defaultConsumerWindowSize
+ */
+ public int getDefaultConsumerWindowSize() {
+ return defaultConsumerWindowSize != null ? defaultConsumerWindowSize : ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;
+ }
+
+ /**
+ * @param defaultConsumerWindowSize the defaultConsumerWindowSize to set
+ */
+ public AddressSettings setDefaultConsumerWindowSize(int defaultConsumerWindowSize) {
+ this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+ return this;
+ }
+
+ /**
* merge 2 objects in to 1
*
* @param merged
@@ -694,6 +713,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (defaultExclusiveQueue == null) {
defaultExclusiveQueue = merged.defaultExclusiveQueue;
}
+ if (defaultConsumerWindowSize == null) {
+ defaultConsumerWindowSize = merged.defaultConsumerWindowSize;
+ }
if (defaultLastValueQueue == null) {
defaultLastValueQueue = merged.defaultLastValueQueue;
}
@@ -811,6 +833,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (buffer.readableBytes() > 0) {
defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer);
}
+
+ if (buffer.readableBytes() > 0) {
+ defaultConsumerWindowSize = BufferHelper.readNullableInteger(buffer);
+ }
}
@Override
@@ -851,7 +877,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
DataConstants.SIZE_BYTE +
BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) +
BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) +
- BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch);
+ BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch) +
+ BufferHelper.sizeOfNullableInteger(defaultConsumerWindowSize);
}
@Override
@@ -932,6 +959,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch);
+ BufferHelper.writeNullableInteger(buffer, defaultConsumerWindowSize);
+
}
/* (non-Javadoc)
@@ -980,6 +1009,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode());
result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
+ result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
return result;
}
@@ -1197,6 +1227,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return false;
} else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch))
return false;
+
+ if (defaultConsumerWindowSize == null) {
+ if (other.defaultConsumerWindowSize != null)
+ return false;
+ } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
+ return false;
return true;
}
@@ -1280,6 +1316,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
defaultConsumersBeforeDispatch +
", defaultDelayBeforeDispatch=" +
defaultDelayBeforeDispatch +
+ ", defaultClientWindowSize=" +
+ defaultConsumerWindowSize +
"]";
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index e96923d..04e2931 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3011,6 +3011,14 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="default-consumer-window-size" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ the default window size for a consumer
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
</xsd:all>
<xsd:attribute name="match" type="xsd:string" use="required">
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 8fcac20..114779d 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -346,6 +346,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(15, conf.getAddressesSettings().get("a2").getDefaultMaxConsumers());
assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
+ assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index eb561fc..186a712 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -337,6 +337,7 @@
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
<default-address-routing-type>ANYCAST</default-address-routing-type>
+ <default-consumer-window-size>10000</default-consumer-window-size>
</address-setting>
</address-settings>
<resource-limit-settings>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 9e1ca40..443958e 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -62,5 +62,6 @@
<default-max-consumers>15</default-max-consumers>
<default-queue-routing-type>MULTICAST</default-queue-routing-type>
<default-address-routing-type>ANYCAST</default-address-routing-type>
+ <default-consumer-window-size>10000</default-consumer-window-size>
</address-setting>
</address-settings>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/docs/user-manual/en/address-model.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index f73adf2..ac5e49b 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -790,3 +790,8 @@ types](#routing-type).
address if the broker is unable to determine the routing-type based on the
client and/or protocol semantics. Default is `MULTICAST`. Read more about
[routing types](#routing-type).
+
+`default-consumer-window-size` defines the default `consumerWindowSize` value
+for a `CORE` protocol consumer, if not defined the default will be set to
+1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size
+if the value is not set on the client. Read more about [flow control](#flow-control).
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/docs/user-manual/en/flow-control.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md
index 44fbee3..0151079 100644
--- a/docs/user-manual/en/flow-control.md
+++ b/docs/user-manual/en/flow-control.md
@@ -33,7 +33,7 @@ buffered on each consumer is determined by the `consumerWindowSize`
parameter.
By default, the `consumerWindowSize` is set to 1 MiB (1024 \* 1024
-bytes).
+bytes) unless overridden via ([Address Settings](address-model.md#configuring-addresses-and-queues-via-address-settings))
The value can be:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fc60d74/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
index d3adee0..d4298e5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
@@ -24,7 +24,9 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -32,6 +34,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -1386,4 +1389,65 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testDefaultConsumerWindowSize() throws Exception {
+ ActiveMQServer messagingService = createServer(false, isNetty());
+
+ messagingService.start();
+ messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
+
+ ClientSessionFactory cf = createSessionFactory(locator);
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
+
+ consumer.start();
+
+ assertEquals(ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE / 2, consumer.getClientWindowSize());
+ }
+
+ @Test
+ public void testConsumerWindowSizeAddressSettings() throws Exception {
+ ActiveMQServer messagingService = createServer(false, isNetty());
+
+ final int defaultConsumerWindowSize = 1024 * 5;
+ final AddressSettings settings = new AddressSettings();
+ settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
+ messagingService.getConfiguration()
+ .getAddressesSettings().put(queueA.toString(), settings);
+
+ messagingService.start();
+ messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
+
+ ClientSessionFactory cf = createSessionFactory(locator);
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
+
+ session.start();
+
+ assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
+ }
+
+ @Test
+ public void testConsumerWindowSizeAddressSettingsWildCard() throws Exception {
+ ActiveMQServer messagingService = createServer(false, isNetty());
+
+ final int defaultConsumerWindowSize = 1024 * 5;
+ final AddressSettings settings = new AddressSettings();
+ settings.setDefaultConsumerWindowSize(defaultConsumerWindowSize);
+ messagingService.getConfiguration()
+ .getAddressesSettings().put("#", settings);
+
+ messagingService.start();
+ messagingService.createQueue(queueA, RoutingType.ANYCAST, queueA, null, true, false);
+
+ ClientSessionFactory cf = createSessionFactory(locator);
+ ClientSession session = cf.createSession(false, true, true);
+ ClientConsumerImpl consumer = (ClientConsumerImpl) session.createConsumer(queueA);
+ ClientConsumerImpl consumer2 = (ClientConsumerImpl) session.createConsumer(queueA);
+
+ session.start();
+
+ assertEquals(defaultConsumerWindowSize / 2, consumer.getClientWindowSize());
+ assertEquals(defaultConsumerWindowSize / 2, consumer2.getClientWindowSize());
+ }
}