You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/05 14:35:31 UTC
[1/3] activemq-artemis git commit: ARTEMIS-636 Add AMQP Hard Soft
Limit for BLOCK
Repository: activemq-artemis
Updated Branches:
refs/heads/master d871dfe62 -> 410cd91f6
ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f721866
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f721866
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f721866
Branch: refs/heads/master
Commit: 2f721866ab982d56c488ed124cc191cf5f627e42
Parents: 06fb4a1
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Jul 27 13:36:08 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Aug 5 15:29:01 2016 +0100
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 30 +++++++---
.../plug/context/ProtonTransactionHandler.java | 29 ++++++----
.../artemis/core/paging/PagingStore.java | 2 +
.../core/paging/impl/PagingStoreImpl.java | 12 ++++
.../core/settings/impl/AddressSettings.java | 35 +++++++++++-
.../resources/schema/artemis-configuration.xsd | 10 +++-
.../core/settings/AddressSettingsTest.java | 5 ++
docs/user-manual/en/flow-control.md | 38 ++++++-------
.../transport/amqp/client/AmqpSession.java | 2 +-
.../amqp/client/AmqpTransactionContext.java | 2 +-
.../tests/integration/proton/ProtonTest.java | 60 +++++++++++++++++++-
.../storage/PersistMultiThreadTest.java | 5 ++
12 files changed, 187 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index b00474d..a00af71 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback;
@@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -56,6 +56,7 @@ import org.proton.plug.AMQPSessionCallback;
import org.proton.plug.AMQPSessionContext;
import org.proton.plug.SASLResult;
import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
import org.proton.plug.sasl.PlainSASLResult;
public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback {
@@ -351,18 +352,33 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
recoverContext();
PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
- if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK) {
- ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + message.getAddress());
- Rejected rejected = new Rejected();
- rejected.setError(ec);
- delivery.disposition(rejected);
- connection.flush();
+ if (store.isRejectingMessages()) {
+ // We drop pre-settled messages (and abort any associated Tx)
+ if (delivery.remotelySettled()) {
+ if (serverSession.getCurrentTransaction() != null) {
+ String amqpAddress = delivery.getLink().getTarget().getAddress();
+ ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
+ serverSession.getCurrentTransaction().markAsRollbackOnly(e);
+ }
+ }
+ else {
+ rejectMessage(delivery);
+ }
}
else {
serverSend(message, delivery, receiver);
}
}
+ private void rejectMessage(Delivery delivery) {
+ String address = delivery.getLink().getTarget().getAddress();
+ ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
+ Rejected rejected = new Rejected();
+ rejected.setError(ec);
+ delivery.disposition(rejected);
+ connection.flush();
+ }
+
private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
try {
serverSession.send(message, false);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index dbf6f38..c8fb994 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -91,40 +91,49 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
try {
sessionSPI.commitCurrentTX();
}
+ catch (ActiveMQAMQPException amqpE) {
+ throw amqpE;
+ }
catch (Exception e) {
throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
}
}
- delivery.settle();
}
}
+ catch (ActiveMQAMQPException amqpE) {
+ delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+ }
catch (Exception e) {
log.warn(e.getMessage(), e);
- Rejected rejected = new Rejected();
- ErrorCondition condition = new ErrorCondition();
- condition.setCondition(Symbol.valueOf("failed"));
- condition.setDescription(e.getMessage());
- rejected.setError(condition);
- delivery.disposition(rejected);
+ delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
}
finally {
+ delivery.settle();
buffer.release();
}
}
+ private Rejected createRejected(Symbol amqpError, String message) {
+ Rejected rejected = new Rejected();
+ ErrorCondition condition = new ErrorCondition();
+ condition.setCondition(amqpError);
+ condition.setDescription(message);
+ rejected.setError(condition);
+ return rejected;
+ }
+
@Override
public void onFlow(int credits, boolean drain) {
-
}
@Override
public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
- //noop
+ // no op
}
@Override
public void close(ErrorCondition condition) throws ActiveMQAMQPException {
- //noop
+ // no op
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 566b91a..79fb115 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -128,6 +128,8 @@ public interface PagingStore extends ActiveMQComponent {
boolean isFull();
+ boolean isRejectingMessages();
+
/**
* Write lock the PagingStore.
*
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f57f1b8..7e6cda8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -123,6 +123,8 @@ public class PagingStoreImpl implements PagingStore {
private volatile AtomicBoolean blocking = new AtomicBoolean(false);
+ private long rejectThreshold;
+
public PagingStoreImpl(final SimpleString address,
final ScheduledExecutorService scheduledExecutor,
final long syncTimeout,
@@ -187,6 +189,8 @@ public class PagingStoreImpl implements PagingStore {
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
+ rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
if (cursorProvider != null) {
cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
}
@@ -1073,6 +1077,14 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
+ public boolean isRejectingMessages() {
+ if (addressFullMessagePolicy != AddressFullMessagePolicy.BLOCK) {
+ return false;
+ }
+ return rejectThreshold != AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD && getAddressSize() > rejectThreshold;
+ }
+
+ @Override
public Collection<Integer> getCurrentIds() throws Exception {
List<Integer> ids = new ArrayList<>();
if (fileFactory != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 642574b..f5f00f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -76,6 +76,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
public static final int DEFAULT_QUEUE_PREFETCH = 1000;
+ // Default address drop threshold, applied to address settings with BLOCK policy. -1 means no threshold enabled.
+ public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1;
+
private AddressFullMessagePolicy addressFullMessagePolicy = null;
private Long maxSizeBytes = null;
@@ -124,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
+ private Long maxSizeBytesRejectThreshold = null;
+
//from amq5
//make it transient
private transient Integer queuePrefetch = null;
@@ -154,6 +159,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
this.managementBrowsePageSize = other.managementBrowsePageSize;
this.queuePrefetch = other.queuePrefetch;
+ this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
}
public AddressSettings() {
@@ -377,6 +383,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
return this;
}
+ public long getMaxSizeBytesRejectThreshold() {
+ return (maxSizeBytesRejectThreshold == null) ? AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD : maxSizeBytesRejectThreshold;
+ }
+
+ public AddressSettings setMaxSizeBytesRejectThreshold(long maxSizeBytesRejectThreshold) {
+ this.maxSizeBytesRejectThreshold = maxSizeBytesRejectThreshold;
+ return this;
+ }
+
/**
* merge 2 objects in to 1
*
@@ -456,6 +471,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
if (queuePrefetch == null) {
queuePrefetch = merged.queuePrefetch;
}
+ if (maxSizeBytesRejectThreshold == null) {
+ maxSizeBytesRejectThreshold = merged.maxSizeBytesRejectThreshold;
+ }
}
@Override
@@ -521,6 +539,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer);
managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
+
+ maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
}
@Override
@@ -549,7 +569,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
- BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
+ BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
+ BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold);
}
@Override
@@ -601,6 +622,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics);
BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
+
+ BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
}
/* (non-Javadoc)
@@ -635,6 +658,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
+ result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : queuePrefetch.hashCode());
return result;
}
@@ -802,6 +826,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
}
else if (!queuePrefetch.equals(other.queuePrefetch))
return false;
+
+ if (maxSizeBytesRejectThreshold == null) {
+ if (other.maxSizeBytesRejectThreshold != null)
+ return false;
+ }
+ else if (!maxSizeBytesRejectThreshold.equals(other.maxSizeBytesRejectThreshold))
+ return false;
return true;
}
@@ -825,6 +856,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
maxDeliveryAttempts +
", maxSizeBytes=" +
maxSizeBytes +
+ ", maxSizeBytesRejectThreshold=" +
+ maxSizeBytesRejectThreshold +
", messageCounterHistoryDayLimit=" +
messageCounterHistoryDayLimit +
", pageSizeBytes=" +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5ac86a0..815ef7c 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2220,7 +2220,15 @@
<xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
- the maximum size (in bytes) to use in paging for an address (-1 means no limits)
+ the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
+ <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit).
</xsd:documentation>
</xsd:annotation>
</xsd:element>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
index 58f7c99..202f2ba 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
@@ -59,6 +59,8 @@ public class AddressSettingsTest extends ActiveMQTestBase {
addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
addressSettingsToMerge.setRedeliveryDelay(1003);
addressSettingsToMerge.setPageSizeBytes(1004);
+ addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
+
addressSettings.merge(addressSettingsToMerge);
Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
Assert.assertEquals(addressSettings.getExpiryAddress(), exp);
@@ -68,6 +70,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003);
Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004);
Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
+ Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
}
@Test
@@ -82,6 +85,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
addressSettingsToMerge.setMaxSizeBytes(1001);
addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
+ addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
addressSettings.merge(addressSettingsToMerge);
AddressSettings addressSettingsToMerge2 = new AddressSettings();
@@ -100,6 +104,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
Assert.assertEquals(addressSettings.getRedeliveryDelay(), 2003);
Assert.assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001);
Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
+ Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
}
@Test
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/docs/user-manual/en/flow-control.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md
index c1b4035..8a11966 100644
--- a/docs/user-manual/en/flow-control.md
+++ b/docs/user-manual/en/flow-control.md
@@ -275,25 +275,25 @@ control.
#### Blocking producer window based flow control using AMQP
-Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support
-flow control. Artemis CORE protocol and AMQP. Both protocols implement flow
-control slightly differently and therefore address full BLOCK policy behaves
-slightly different for clients uses each protocol respectively.
-
-As explained earlier in this chapter the CORE protocol uses a producer window size
-flow control system. Where credits (representing bytes) are allocated to producers,
-if a producer wants to send a message it should wait until it has enough bytes available
-to send it. AMQP flow control credits are not representative of bytes but instead represent
-the number of messages a producer is permitted to send (regardless of size).
-
-BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis
-will issue 100 credits to a client at a time and refresh them when the clients credits reaches 30.
-The broker will stop issuing credits once an address is full. However, since AMQP credits represent
-whole messages and not bytes, it would be possible for an AMQP client to significantly exceed an
-address upper bound should the broker continue accepting messages until the clients credits are exhausted.
-For this reason once an address has reached it's upper bound and is blocked (when using AMQP) Artemis
-will start rejecting messages until the address becomes unblocked. This should be taken into consideration when writing
-application code.
+Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support flow control. Artemis CORE protocol and
+AMQP. Both protocols implement flow control slightly differently and therefore address full BLOCK policy behaves slightly
+different for clients that use each protocol respectively.
+
+As explained earlier in this chapter the CORE protocol uses a producer window size flow control system. Where credits
+(representing bytes) are allocated to producers, if a producer wants to send a message it should wait until it has
+enough byte credits available for it to send. AMQP flow control credits are not representative of bytes but instead
+represent the number of messages a producer is permitted to send (regardless of the message size).
+
+BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis will issue 100 credits
+to a client at a time and refresh them when the clients credits reaches 30. The broker will stop issuing credits once an
+address is full. However, since AMQP credits represent whole messages and not bytes, it would be possible in some
+scenarios for an AMQP client to significantly exceed an address upper bound should the broker continue accepting
+messages until the clients credits are exhausted. For this reason there is an additional parameter available on address
+settings that specifies an upper bound on an address size in bytes. Once this upper bound is reach Artemis will start
+rejecting AMQP messages. This limit is the max-size-bytes-reject-threshold and is by default set to -1 (or no limit).
+This is additional parameter allows a kind of soft and hard limit, in normal circumstances the broker will utilize the
+max-size-bytes parameter using using flow control to put back pressure on the client, but will protect the broker by
+rejecting messages once the address size is reached.
### Rate limited flow control
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 28e38f2..82b6aec 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -412,7 +412,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
return txContext.getTransactionId();
}
- AmqpTransactionContext getTransactionContext() {
+ public AmqpTransactionContext getTransactionContext() {
return txContext;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
index dcf23d2..2f3e22a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
@@ -213,7 +213,7 @@ public class AmqpTransactionContext {
//----- Internal access to context properties ----------------------------//
- AmqpTransactionCoordinator getCoordinator() {
+ public AmqpTransactionCoordinator getCoordinator() {
return coordinator;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b170f82..785543d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -95,8 +95,13 @@ public class ProtonTest extends ActiveMQTestBase {
private static final String password = "guest";
+
private static final String brokerName = "my-broker";
+ private static final long maxSizeBytes = 1 * 1024 * 1024;
+
+ private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
+
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
@@ -310,6 +315,7 @@ public class ProtonTest extends ActiveMQTestBase {
Assert.assertEquals(q.getMessageCount(), 0);
}
+
@Test
public void testRollbackConsumer() throws Throwable {
@@ -342,8 +348,11 @@ public class ProtonTest extends ActiveMQTestBase {
public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
setAddressFullBlockPolicy();
+ String destinationAddress = address + 1;
+ fillAddress(destinationAddress);
- fillAddress(address + 1);
+ long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+ assertTrue(addressSize >= maxSizeBytesRejectThreshold);
}
@Test
@@ -367,6 +376,9 @@ public class ProtonTest extends ActiveMQTestBase {
}
assertTrue(e instanceof ResourceAllocationException);
assertTrue(e.getMessage().contains("resource-limit-exceeded"));
+
+ long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+ assertTrue(addressSize >= maxSizeBytesRejectThreshold);
}
@Test
@@ -393,6 +405,9 @@ public class ProtonTest extends ActiveMQTestBase {
// This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
assertTrue(sender.getSender().getCredit() == -1);
+
+ long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+ assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
}
finally {
amqpConnection.close();
@@ -446,7 +461,7 @@ public class ProtonTest extends ActiveMQTestBase {
fillAddress(address + 1);
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
- AmqpConnection amqpConnection = amqpConnection = client.connect();
+ AmqpConnection amqpConnection = client.connect();
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address + 1);
@@ -459,6 +474,43 @@ public class ProtonTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
+ if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ setAddressFullBlockPolicy();
+
+ // Create the link attach before filling the address to ensure the link is allocated credit.
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpConnection amqpConnection = client.connect();
+
+ AmqpSession session = amqpConnection.createSession();
+ AmqpSender sender = session.createSender(address);
+ sender.setPresettle(true);
+
+ fillAddress(address);
+
+ final AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[50 * 1024];
+ message.setBytes(payload);
+
+ Exception expectedException = null;
+ try {
+ session.begin();
+ sender.send(message);
+ session.commit();
+ }
+ catch (Exception e) {
+ expectedException = e;
+ }
+ finally {
+ amqpConnection.close();
+ }
+
+ assertNotNull(expectedException);
+ assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
+ assertTrue(expectedException.getMessage().contains("Address is full: " + address));
+ }
+
/**
* Fills an address. Careful when using this method. Only use when rejected messages are switched on.
* @param address
@@ -520,6 +572,7 @@ public class ProtonTest extends ActiveMQTestBase {
timeout.await(5, TimeUnit.SECONDS);
+ System.out.println("Messages Sent: " + sentMessages);
if (errors[0] != null) {
throw errors[0];
}
@@ -1313,7 +1366,8 @@ public class ProtonTest extends ActiveMQTestBase {
// For BLOCK tests
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+ addressSettings.setMaxSizeBytes(maxSizeBytes);
+ addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold);
server.getAddressSettingsRepository().addMatch("#", addressSettings);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 33ee0c7..6c42413 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -307,6 +307,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
+ public boolean isRejectingMessages() {
+ return false;
+ }
+
+ @Override
public void applySetting(AddressSettings addressSettings) {
}
[3/3] activemq-artemis git commit: This closes #690
Posted by cl...@apache.org.
This closes #690
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/410cd91f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/410cd91f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/410cd91f
Branch: refs/heads/master
Commit: 410cd91f6f554dcb34f9529f117fc0052e29e5e1
Parents: d871dfe 2f72186
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 5 10:35:01 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 5 10:35:01 2016 -0400
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQException.java | 10 ++++
.../artemis/api/core/ActiveMQExceptionType.java | 3 +-
.../plug/ProtonSessionIntegrationCallback.java | 30 +++++++---
artemis-protocols/artemis-proton-plug/pom.xml | 5 ++
.../plug/context/ProtonTransactionHandler.java | 29 ++++++----
.../plug/exceptions/ActiveMQAMQPException.java | 12 ++--
.../ActiveMQAMQPIllegalStateException.java | 3 +-
.../ActiveMQAMQPInternalErrorException.java | 5 +-
.../ActiveMQAMQPInvalidFieldException.java | 3 +-
.../ActiveMQAMQPNotFoundException.java | 7 +--
.../ActiveMQAMQPNotImplementedException.java | 5 +-
...iveMQAMQPResourceLimitExceededException.java | 27 +++++++++
.../ActiveMQAMQPTimeoutException.java | 3 +-
.../artemis/core/paging/PagingStore.java | 2 +
.../core/paging/impl/PagingStoreImpl.java | 12 ++++
.../core/settings/impl/AddressSettings.java | 35 +++++++++++-
.../resources/schema/artemis-configuration.xsd | 10 +++-
.../core/settings/AddressSettingsTest.java | 5 ++
docs/user-manual/en/flow-control.md | 38 ++++++-------
.../transport/amqp/client/AmqpSession.java | 2 +-
.../amqp/client/AmqpTransactionContext.java | 2 +-
.../tests/integration/proton/ProtonTest.java | 60 +++++++++++++++++++-
.../storage/PersistMultiThreadTest.java | 5 ++
23 files changed, 252 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-667 Make AMQP Exceptions
extend ActiveMQException
Posted by cl...@apache.org.
ARTEMIS-667 Make AMQP Exceptions extend ActiveMQException
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/06fb4a12
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/06fb4a12
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/06fb4a12
Branch: refs/heads/master
Commit: 06fb4a1234887737b9e4d02c75e51836d286023a
Parents: d871dfe
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Aug 4 13:58:30 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Aug 5 15:29:01 2016 +0100
----------------------------------------------------------------------
.../artemis/api/core/ActiveMQException.java | 10 ++++++++
.../artemis/api/core/ActiveMQExceptionType.java | 3 ++-
artemis-protocols/artemis-proton-plug/pom.xml | 5 ++++
.../plug/exceptions/ActiveMQAMQPException.java | 12 +++++----
.../ActiveMQAMQPIllegalStateException.java | 3 ++-
.../ActiveMQAMQPInternalErrorException.java | 5 ++--
.../ActiveMQAMQPInvalidFieldException.java | 3 ++-
.../ActiveMQAMQPNotFoundException.java | 7 ++---
.../ActiveMQAMQPNotImplementedException.java | 5 ++--
...iveMQAMQPResourceLimitExceededException.java | 27 ++++++++++++++++++++
.../ActiveMQAMQPTimeoutException.java | 3 ++-
11 files changed, 65 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
index 13a35b1..6404c74 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
@@ -34,6 +34,16 @@ public class ActiveMQException extends Exception {
type = ActiveMQExceptionType.GENERIC_EXCEPTION;
}
+ public ActiveMQException(String msg, ActiveMQExceptionType t) {
+ super(msg);
+ type = t;
+ }
+
+ public ActiveMQException(String message, Throwable t, ActiveMQExceptionType type) {
+ super(message, t);
+ this.type = type;
+ }
+
/*
* This constructor is needed only for the native layer
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index f135653..eb4bf5d 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -201,7 +201,8 @@ public enum ActiveMQExceptionType {
return new ActiveMQClusterSecurityException(msg);
}
- };
+ },
+ NOT_IMPLEMTNED_EXCEPTION(213);
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/pom.xml b/artemis-protocols/artemis-proton-plug/pom.xml
index 684b3ce..b4876cf 100644
--- a/artemis-protocols/artemis-proton-plug/pom.xml
+++ b/artemis-protocols/artemis-proton-plug/pom.xml
@@ -77,6 +77,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>artemis-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
index 6e240e3..4838d55 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
@@ -16,9 +16,11 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.Symbol;
-public class ActiveMQAMQPException extends Exception {
+public class ActiveMQAMQPException extends ActiveMQException {
private static final String ERROR_PREFIX = "amqp:";
@@ -28,13 +30,13 @@ public class ActiveMQAMQPException extends Exception {
private final Symbol amqpError;
- public ActiveMQAMQPException(Symbol amqpError, String message, Throwable e) {
- super(message, e);
+ public ActiveMQAMQPException(Symbol amqpError, String message, Throwable e, ActiveMQExceptionType t) {
+ super(message, e, t);
this.amqpError = amqpError;
}
- public ActiveMQAMQPException(Symbol amqpError, String message) {
- super(message);
+ public ActiveMQAMQPException(Symbol amqpError, String message, ActiveMQExceptionType t) {
+ super(message, t);
this.amqpError = amqpError;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
index cdbf4fa..7818ef9 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
@@ -16,11 +16,12 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPIllegalStateException extends ActiveMQAMQPException {
public ActiveMQAMQPIllegalStateException(String message) {
- super(AmqpError.ILLEGAL_STATE, message);
+ super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.ILLEGAL_STATE);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
index e30073c..2c0b0ae 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
@@ -16,15 +16,16 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPInternalErrorException extends ActiveMQAMQPException {
public ActiveMQAMQPInternalErrorException(String message, Throwable e) {
- super(AmqpError.INTERNAL_ERROR, message, e);
+ super(AmqpError.INTERNAL_ERROR, message, e, ActiveMQExceptionType.INTERNAL_ERROR);
}
public ActiveMQAMQPInternalErrorException(String message) {
- super(AmqpError.INTERNAL_ERROR, message);
+ super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
index c6978a2..f5dd168 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
@@ -16,11 +16,12 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPInvalidFieldException extends ActiveMQAMQPException {
public ActiveMQAMQPInvalidFieldException(String message) {
- super(AmqpError.INVALID_FIELD, message);
+ super(AmqpError.INVALID_FIELD, message, ActiveMQExceptionType.ILLEGAL_STATE);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
index dc4c400..02cc15c 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
@@ -16,15 +16,12 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPNotFoundException extends ActiveMQAMQPException {
- public ActiveMQAMQPNotFoundException(String message, Throwable e) {
- super(AmqpError.NOT_FOUND, message, e);
- }
-
public ActiveMQAMQPNotFoundException(String message) {
- super(AmqpError.NOT_FOUND, message);
+ super(AmqpError.NOT_FOUND, message, ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
index 6a1c95c..861e236 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
@@ -16,11 +16,12 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPNotImplementedException extends ActiveMQAMQPException {
public ActiveMQAMQPNotImplementedException(String message) {
- super(AmqpError.NOT_IMPLEMENTED, message);
+ super(AmqpError.NOT_IMPLEMENTED, message, ActiveMQExceptionType.NOT_IMPLEMTNED_EXCEPTION);
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
new file mode 100644
index 0000000..2c64a8d
--- /dev/null
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.proton.plug.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPResourceLimitExceededException extends ActiveMQAMQPException {
+
+ public ActiveMQAMQPResourceLimitExceededException(String message) {
+ super(AmqpError.RESOURCE_LIMIT_EXCEEDED, message, ActiveMQExceptionType.ADDRESS_FULL);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
index 25b4ea6..c86c25d 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
@@ -16,12 +16,13 @@
*/
package org.proton.plug.exceptions;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.qpid.proton.amqp.transport.AmqpError;
public class ActiveMQAMQPTimeoutException extends ActiveMQAMQPException {
public ActiveMQAMQPTimeoutException(String message) {
- super(AmqpError.ILLEGAL_STATE, message);
+ super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.CONNECTION_TIMEDOUT);
}
}