You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/05 14:35:31 UTC

[1/3] activemq-artemis git commit: ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK

Repository: activemq-artemis
Updated Branches:
  refs/heads/master d871dfe62 -> 410cd91f6


ARTEMIS-636 Add AMQP Hard Soft Limit for BLOCK


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f721866
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f721866
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f721866

Branch: refs/heads/master
Commit: 2f721866ab982d56c488ed124cc191cf5f627e42
Parents: 06fb4a1
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Jul 27 13:36:08 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Aug 5 15:29:01 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  | 30 +++++++---
 .../plug/context/ProtonTransactionHandler.java  | 29 ++++++----
 .../artemis/core/paging/PagingStore.java        |  2 +
 .../core/paging/impl/PagingStoreImpl.java       | 12 ++++
 .../core/settings/impl/AddressSettings.java     | 35 +++++++++++-
 .../resources/schema/artemis-configuration.xsd  | 10 +++-
 .../core/settings/AddressSettingsTest.java      |  5 ++
 docs/user-manual/en/flow-control.md             | 38 ++++++-------
 .../transport/amqp/client/AmqpSession.java      |  2 +-
 .../amqp/client/AmqpTransactionContext.java     |  2 +-
 .../tests/integration/proton/ProtonTest.java    | 60 +++++++++++++++++++-
 .../storage/PersistMultiThreadTest.java         |  5 ++
 12 files changed, 187 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index b00474d..a00af71 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.core.io.IOCallback;
@@ -32,7 +33,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
-import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
@@ -56,6 +56,7 @@ import org.proton.plug.AMQPSessionCallback;
 import org.proton.plug.AMQPSessionContext;
 import org.proton.plug.SASLResult;
 import org.proton.plug.context.ProtonPlugSender;
+import org.proton.plug.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import org.proton.plug.sasl.PlainSASLResult;
 
 public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback {
@@ -351,18 +352,33 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
       recoverContext();
 
       PagingStore store = manager.getServer().getPagingManager().getPageStore(message.getAddress());
-      if (store.isFull() && store.getAddressFullMessagePolicy() == AddressFullMessagePolicy.BLOCK) {
-         ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + message.getAddress());
-         Rejected rejected = new Rejected();
-         rejected.setError(ec);
-         delivery.disposition(rejected);
-         connection.flush();
+      if (store.isRejectingMessages()) {
+         // We drop pre-settled messages (and abort any associated Tx)
+         if (delivery.remotelySettled()) {
+            if (serverSession.getCurrentTransaction() != null) {
+               String amqpAddress = delivery.getLink().getTarget().getAddress();
+               ActiveMQException e = new ActiveMQAMQPResourceLimitExceededException("Address is full: " + amqpAddress);
+               serverSession.getCurrentTransaction().markAsRollbackOnly(e);
+            }
+         }
+         else {
+            rejectMessage(delivery);
+         }
       }
       else {
          serverSend(message, delivery, receiver);
       }
    }
 
+   private void rejectMessage(Delivery delivery) {
+      String address = delivery.getLink().getTarget().getAddress();
+      ErrorCondition ec = new ErrorCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED, "Address is full: " + address);
+      Rejected rejected = new Rejected();
+      rejected.setError(ec);
+      delivery.disposition(rejected);
+      connection.flush();
+   }
+
    private void serverSend(final ServerMessage message, final Delivery delivery, final Receiver receiver) throws Exception {
       try {
          serverSession.send(message, false);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
index dbf6f38..c8fb994 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/ProtonTransactionHandler.java
@@ -91,40 +91,49 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler {
                try {
                   sessionSPI.commitCurrentTX();
                }
+               catch (ActiveMQAMQPException amqpE) {
+                  throw amqpE;
+               }
                catch (Exception e) {
                   throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage());
                }
             }
-            delivery.settle();
          }
 
       }
+      catch (ActiveMQAMQPException amqpE) {
+         delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage()));
+      }
       catch (Exception e) {
          log.warn(e.getMessage(), e);
-         Rejected rejected = new Rejected();
-         ErrorCondition condition = new ErrorCondition();
-         condition.setCondition(Symbol.valueOf("failed"));
-         condition.setDescription(e.getMessage());
-         rejected.setError(condition);
-         delivery.disposition(rejected);
+         delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage()));
       }
       finally {
+         delivery.settle();
          buffer.release();
       }
    }
 
+   private Rejected createRejected(Symbol amqpError, String message) {
+      Rejected rejected = new Rejected();
+      ErrorCondition condition = new ErrorCondition();
+      condition.setCondition(amqpError);
+      condition.setDescription(message);
+      rejected.setError(condition);
+      return rejected;
+   }
+
    @Override
    public void onFlow(int credits, boolean drain) {
-
    }
 
    @Override
    public void close(boolean linkRemoteClose) throws ActiveMQAMQPException {
-      //noop
+      // no op
    }
 
    @Override
    public void close(ErrorCondition condition) throws ActiveMQAMQPException {
-      //noop
+      // no op
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 566b91a..79fb115 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -128,6 +128,8 @@ public interface PagingStore extends ActiveMQComponent {
 
    boolean isFull();
 
+   boolean isRejectingMessages();
+
    /**
     * Write lock the PagingStore.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index f57f1b8..7e6cda8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -123,6 +123,8 @@ public class PagingStoreImpl implements PagingStore {
 
    private volatile AtomicBoolean blocking = new AtomicBoolean(false);
 
+   private long rejectThreshold;
+
    public PagingStoreImpl(final SimpleString address,
                           final ScheduledExecutorService scheduledExecutor,
                           final long syncTimeout,
@@ -187,6 +189,8 @@ public class PagingStoreImpl implements PagingStore {
 
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
 
+      rejectThreshold = addressSettings.getMaxSizeBytesRejectThreshold();
+
       if (cursorProvider != null) {
          cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
       }
@@ -1073,6 +1077,14 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
+   public boolean isRejectingMessages() {
+      if (addressFullMessagePolicy != AddressFullMessagePolicy.BLOCK) {
+         return false;
+      }
+      return rejectThreshold != AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD && getAddressSize() > rejectThreshold;
+   }
+
+   @Override
    public Collection<Integer> getCurrentIds() throws Exception {
       List<Integer> ids = new ArrayList<>();
       if (fileFactory != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 642574b..f5f00f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -76,6 +76,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    public static final int DEFAULT_QUEUE_PREFETCH = 1000;
 
+   // Default address drop threshold, applied to address settings with BLOCK policy.  -1 means no threshold enabled.
+   public static final long DEFAULT_ADDRESS_REJECT_THRESHOLD = -1;
+
    private AddressFullMessagePolicy addressFullMessagePolicy = null;
 
    private Long maxSizeBytes = null;
@@ -124,6 +127,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Integer managementBrowsePageSize = AddressSettings.MANAGEMENT_BROWSE_PAGE_SIZE;
 
+   private Long maxSizeBytesRejectThreshold = null;
+
    //from amq5
    //make it transient
    private transient Integer queuePrefetch = null;
@@ -154,6 +159,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.autoDeleteJmsTopics = other.autoDeleteJmsTopics;
       this.managementBrowsePageSize = other.managementBrowsePageSize;
       this.queuePrefetch = other.queuePrefetch;
+      this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold;
    }
 
    public AddressSettings() {
@@ -377,6 +383,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public long getMaxSizeBytesRejectThreshold() {
+      return (maxSizeBytesRejectThreshold == null) ? AddressSettings.DEFAULT_ADDRESS_REJECT_THRESHOLD : maxSizeBytesRejectThreshold;
+   }
+
+   public AddressSettings setMaxSizeBytesRejectThreshold(long maxSizeBytesRejectThreshold) {
+      this.maxSizeBytesRejectThreshold = maxSizeBytesRejectThreshold;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -456,6 +471,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (queuePrefetch == null) {
          queuePrefetch = merged.queuePrefetch;
       }
+      if (maxSizeBytesRejectThreshold == null) {
+         maxSizeBytesRejectThreshold = merged.maxSizeBytesRejectThreshold;
+      }
    }
 
    @Override
@@ -521,6 +539,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       autoDeleteJmsTopics = BufferHelper.readNullableBoolean(buffer);
 
       managementBrowsePageSize = BufferHelper.readNullableInteger(buffer);
+
+      maxSizeBytesRejectThreshold = BufferHelper.readNullableLong(buffer);
    }
 
    @Override
@@ -549,7 +569,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(autoDeleteJmsQueues) +
          BufferHelper.sizeOfNullableBoolean(autoCreateJmsTopics) +
          BufferHelper.sizeOfNullableBoolean(autoDeleteJmsTopics) +
-         BufferHelper.sizeOfNullableInteger(managementBrowsePageSize);
+         BufferHelper.sizeOfNullableInteger(managementBrowsePageSize) +
+         BufferHelper.sizeOfNullableLong(maxSizeBytesRejectThreshold);
    }
 
    @Override
@@ -601,6 +622,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       BufferHelper.writeNullableBoolean(buffer, autoDeleteJmsTopics);
 
       BufferHelper.writeNullableInteger(buffer, managementBrowsePageSize);
+
+      BufferHelper.writeNullableLong(buffer, maxSizeBytesRejectThreshold);
    }
 
    /* (non-Javadoc)
@@ -635,6 +658,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((autoDeleteJmsTopics == null) ? 0 : autoDeleteJmsTopics.hashCode());
       result = prime * result + ((managementBrowsePageSize == null) ? 0 : managementBrowsePageSize.hashCode());
       result = prime * result + ((queuePrefetch == null) ? 0 : queuePrefetch.hashCode());
+      result = prime * result + ((maxSizeBytesRejectThreshold == null) ? 0 : queuePrefetch.hashCode());
       return result;
    }
 
@@ -802,6 +826,13 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       }
       else if (!queuePrefetch.equals(other.queuePrefetch))
          return false;
+
+      if (maxSizeBytesRejectThreshold == null) {
+         if (other.maxSizeBytesRejectThreshold != null)
+            return false;
+      }
+      else if (!maxSizeBytesRejectThreshold.equals(other.maxSizeBytesRejectThreshold))
+         return false;
       return true;
    }
 
@@ -825,6 +856,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          maxDeliveryAttempts +
          ", maxSizeBytes=" +
          maxSizeBytes +
+         ", maxSizeBytesRejectThreshold=" +
+         maxSizeBytesRejectThreshold +
          ", messageCounterHistoryDayLimit=" +
          messageCounterHistoryDayLimit +
          ", pageSizeBytes=" +

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5ac86a0..815ef7c 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -2220,7 +2220,15 @@
             <xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
-                     the maximum size (in bytes) to use in paging for an address (-1 means no limits)
+                     the maximum size (in bytes) for an address (-1 means no limits).  This is used in PAGING, BLOCK and FAIL policies.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected.  Works in combination with max-size-bytes for AMQP protocol only.  Default = -1 (no limit).
                   </xsd:documentation>
                </xsd:annotation>
             </xsd:element>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
index 58f7c99..202f2ba 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/settings/AddressSettingsTest.java
@@ -59,6 +59,8 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
       addressSettingsToMerge.setRedeliveryDelay(1003);
       addressSettingsToMerge.setPageSizeBytes(1004);
+      addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
+
       addressSettings.merge(addressSettingsToMerge);
       Assert.assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
       Assert.assertEquals(addressSettings.getExpiryAddress(), exp);
@@ -68,6 +70,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       Assert.assertEquals(addressSettings.getRedeliveryDelay(), 1003);
       Assert.assertEquals(addressSettings.getPageSizeBytes(), 1004);
       Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
+      Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
    }
 
    @Test
@@ -82,6 +85,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       addressSettingsToMerge.setMaxSizeBytes(1001);
       addressSettingsToMerge.setMessageCounterHistoryDayLimit(1002);
       addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.DROP);
+      addressSettingsToMerge.setMaxSizeBytesRejectThreshold(10 * 1024);
       addressSettings.merge(addressSettingsToMerge);
 
       AddressSettings addressSettingsToMerge2 = new AddressSettings();
@@ -100,6 +104,7 @@ public class AddressSettingsTest extends ActiveMQTestBase {
       Assert.assertEquals(addressSettings.getRedeliveryDelay(), 2003);
       Assert.assertEquals(addressSettings.getRedeliveryMultiplier(), 2.5, 0.000001);
       Assert.assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
+      Assert.assertEquals(addressSettings.getMaxSizeBytesRejectThreshold(), 10 * 1024);
    }
 
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/docs/user-manual/en/flow-control.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/flow-control.md b/docs/user-manual/en/flow-control.md
index c1b4035..8a11966 100644
--- a/docs/user-manual/en/flow-control.md
+++ b/docs/user-manual/en/flow-control.md
@@ -275,25 +275,25 @@ control.
 
 #### Blocking producer window based flow control using AMQP
 
-Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support
-flow control.  Artemis CORE protocol and AMQP.  Both protocols implement flow
-control slightly differently and therefore address full BLOCK policy behaves
-slightly different for clients uses each protocol respectively.
-
-As explained earlier in this chapter the CORE protocol uses a producer window size
-flow control system.  Where credits (representing bytes) are allocated to producers,
-if a producer wants to send a message it should wait until it has enough bytes available
-to send it.  AMQP flow control credits are not representative of bytes but instead represent
-the number of messages a producer is permitted to send (regardless of size).
-
-BLOCK for AMQP works mostly in the same way as the producer window size mechanism above.  Artemis
-will issue 100 credits to a client at a time and refresh them when the clients credits reaches 30.
-The broker will stop issuing credits once an address is full.  However, since AMQP credits represent
-whole messages and not bytes, it would be possible for an AMQP client to significantly exceed an
-address upper bound should the broker continue accepting messages until the clients credits are exhausted.
-For this reason once an address has reached it's upper bound and is blocked (when using AMQP) Artemis
-will start rejecting messages until the address becomes unblocked.  This should be taken into consideration when writing
-application code.
+Apache ActiveMQ Artemis ships with out of the box with 2 protocols that support flow control. Artemis CORE protocol and
+AMQP. Both protocols implement flow control slightly differently and therefore address full BLOCK policy behaves slightly
+different for clients that use each protocol respectively.
+
+As explained earlier in this chapter the CORE protocol uses a producer window size flow control system. Where credits
+(representing bytes) are allocated to producers, if a producer wants to send a message it should wait until it has
+enough byte credits available for it to send. AMQP flow control credits are not representative of bytes but instead
+represent the number of messages a producer is permitted to send (regardless of the message size).
+
+BLOCK for AMQP works mostly in the same way as the producer window size mechanism above. Artemis will issue 100 credits
+to a client at a time and refresh them when the clients credits reaches 30. The broker will stop issuing credits once an
+address is full. However, since AMQP credits represent whole messages and not bytes, it would be possible in some
+scenarios for an AMQP client to significantly exceed an address upper bound should the broker continue accepting
+messages until the clients credits are exhausted. For this reason there is an additional parameter available on address
+settings that specifies an upper bound on an address size in bytes. Once this upper bound is reach Artemis will start
+rejecting AMQP messages. This limit is the max-size-bytes-reject-threshold and is by default set to -1 (or no limit).
+This is additional parameter allows a kind of soft and hard limit, in normal circumstances the broker will utilize the
+max-size-bytes parameter using using flow control to put back pressure on the client, but will protect the broker by
+rejecting messages once the address size is reached.
 
 ### Rate limited flow control
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 28e38f2..82b6aec 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -412,7 +412,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
       return txContext.getTransactionId();
    }
 
-   AmqpTransactionContext getTransactionContext() {
+   public AmqpTransactionContext getTransactionContext() {
       return txContext;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
index dcf23d2..2f3e22a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpTransactionContext.java
@@ -213,7 +213,7 @@ public class AmqpTransactionContext {
 
    //----- Internal access to context properties ----------------------------//
 
-   AmqpTransactionCoordinator getCoordinator() {
+   public AmqpTransactionCoordinator getCoordinator() {
       return coordinator;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index b170f82..785543d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -95,8 +95,13 @@ public class ProtonTest extends ActiveMQTestBase {
 
    private static final String password = "guest";
 
+
    private static final String brokerName = "my-broker";
 
+   private static final long maxSizeBytes = 1 * 1024 * 1024;
+
+   private static final long maxSizeBytesRejectThreshold = 2 * 1024 * 1024;
+
    // this will ensure that all tests in this class are run twice,
    // once with "true" passed to the class' constructor and once with "false"
    @Parameterized.Parameters(name = "{0}")
@@ -310,6 +315,7 @@ public class ProtonTest extends ActiveMQTestBase {
       Assert.assertEquals(q.getMessageCount(), 0);
    }
 
+
    @Test
    public void testRollbackConsumer() throws Throwable {
 
@@ -342,8 +348,11 @@ public class ProtonTest extends ActiveMQTestBase {
    public void testResourceLimitExceptionOnAddressFull() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
       setAddressFullBlockPolicy();
+      String destinationAddress = address + 1;
+      fillAddress(destinationAddress);
 
-      fillAddress(address + 1);
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+      assertTrue(addressSize >= maxSizeBytesRejectThreshold);
    }
 
    @Test
@@ -367,6 +376,9 @@ public class ProtonTest extends ActiveMQTestBase {
       }
       assertTrue(e instanceof ResourceAllocationException);
       assertTrue(e.getMessage().contains("resource-limit-exceeded"));
+
+      long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+      assertTrue(addressSize >= maxSizeBytesRejectThreshold);
    }
 
    @Test
@@ -393,6 +405,9 @@ public class ProtonTest extends ActiveMQTestBase {
 
          // This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
          assertTrue(sender.getSender().getCredit() == -1);
+
+         long addressSize = server.getPagingManager().getPageStore(new SimpleString(destinationAddress)).getAddressSize();
+         assertTrue(addressSize >= maxSizeBytes && addressSize <= maxSizeBytesRejectThreshold);
       }
       finally {
          amqpConnection.close();
@@ -446,7 +461,7 @@ public class ProtonTest extends ActiveMQTestBase {
 
       fillAddress(address + 1);
       AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
-      AmqpConnection amqpConnection = amqpConnection = client.connect();
+      AmqpConnection amqpConnection = client.connect();
       try {
          AmqpSession session = amqpConnection.createSession();
          AmqpSender sender = session.createSender(address + 1);
@@ -459,6 +474,43 @@ public class ProtonTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testTxIsRolledBackOnRejectedPreSettledMessage() throws Throwable {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+      setAddressFullBlockPolicy();
+
+      // Create the link attach before filling the address to ensure the link is allocated credit.
+      AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+      AmqpConnection amqpConnection = client.connect();
+
+      AmqpSession session = amqpConnection.createSession();
+      AmqpSender sender = session.createSender(address);
+      sender.setPresettle(true);
+
+      fillAddress(address);
+
+      final AmqpMessage message = new AmqpMessage();
+      byte[] payload = new byte[50 * 1024];
+      message.setBytes(payload);
+
+      Exception expectedException = null;
+      try {
+         session.begin();
+         sender.send(message);
+         session.commit();
+      }
+      catch (Exception e) {
+         expectedException = e;
+      }
+      finally {
+         amqpConnection.close();
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains("resource-limit-exceeded"));
+      assertTrue(expectedException.getMessage().contains("Address is full: " + address));
+   }
+
    /**
     * Fills an address.  Careful when using this method.  Only use when rejected messages are switched on.
     * @param address
@@ -520,6 +572,7 @@ public class ProtonTest extends ActiveMQTestBase {
 
       timeout.await(5, TimeUnit.SECONDS);
 
+      System.out.println("Messages Sent: " + sentMessages);
       if (errors[0] != null) {
          throw errors[0];
       }
@@ -1313,7 +1366,8 @@ public class ProtonTest extends ActiveMQTestBase {
       // For BLOCK tests
       AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
-      addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+      addressSettings.setMaxSizeBytes(maxSizeBytes);
+      addressSettings.setMaxSizeBytesRejectThreshold(maxSizeBytesRejectThreshold);
       server.getAddressSettingsRepository().addMatch("#", addressSettings);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f721866/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
----------------------------------------------------------------------
diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 33ee0c7..6c42413 100644
--- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -307,6 +307,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
       }
 
       @Override
+      public boolean isRejectingMessages() {
+         return false;
+      }
+
+      @Override
       public void applySetting(AddressSettings addressSettings) {
 
       }


[3/3] activemq-artemis git commit: This closes #690

Posted by cl...@apache.org.
This closes #690


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/410cd91f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/410cd91f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/410cd91f

Branch: refs/heads/master
Commit: 410cd91f6f554dcb34f9529f117fc0052e29e5e1
Parents: d871dfe 2f72186
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 5 10:35:01 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 5 10:35:01 2016 -0400

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQException.java     | 10 ++++
 .../artemis/api/core/ActiveMQExceptionType.java |  3 +-
 .../plug/ProtonSessionIntegrationCallback.java  | 30 +++++++---
 artemis-protocols/artemis-proton-plug/pom.xml   |  5 ++
 .../plug/context/ProtonTransactionHandler.java  | 29 ++++++----
 .../plug/exceptions/ActiveMQAMQPException.java  | 12 ++--
 .../ActiveMQAMQPIllegalStateException.java      |  3 +-
 .../ActiveMQAMQPInternalErrorException.java     |  5 +-
 .../ActiveMQAMQPInvalidFieldException.java      |  3 +-
 .../ActiveMQAMQPNotFoundException.java          |  7 +--
 .../ActiveMQAMQPNotImplementedException.java    |  5 +-
 ...iveMQAMQPResourceLimitExceededException.java | 27 +++++++++
 .../ActiveMQAMQPTimeoutException.java           |  3 +-
 .../artemis/core/paging/PagingStore.java        |  2 +
 .../core/paging/impl/PagingStoreImpl.java       | 12 ++++
 .../core/settings/impl/AddressSettings.java     | 35 +++++++++++-
 .../resources/schema/artemis-configuration.xsd  | 10 +++-
 .../core/settings/AddressSettingsTest.java      |  5 ++
 docs/user-manual/en/flow-control.md             | 38 ++++++-------
 .../transport/amqp/client/AmqpSession.java      |  2 +-
 .../amqp/client/AmqpTransactionContext.java     |  2 +-
 .../tests/integration/proton/ProtonTest.java    | 60 +++++++++++++++++++-
 .../storage/PersistMultiThreadTest.java         |  5 ++
 23 files changed, 252 insertions(+), 61 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-artemis git commit: ARTEMIS-667 Make AMQP Exceptions extend ActiveMQException

Posted by cl...@apache.org.
ARTEMIS-667 Make AMQP Exceptions extend ActiveMQException


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/06fb4a12
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/06fb4a12
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/06fb4a12

Branch: refs/heads/master
Commit: 06fb4a1234887737b9e4d02c75e51836d286023a
Parents: d871dfe
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Aug 4 13:58:30 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Aug 5 15:29:01 2016 +0100

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQException.java     | 10 ++++++++
 .../artemis/api/core/ActiveMQExceptionType.java |  3 ++-
 artemis-protocols/artemis-proton-plug/pom.xml   |  5 ++++
 .../plug/exceptions/ActiveMQAMQPException.java  | 12 +++++----
 .../ActiveMQAMQPIllegalStateException.java      |  3 ++-
 .../ActiveMQAMQPInternalErrorException.java     |  5 ++--
 .../ActiveMQAMQPInvalidFieldException.java      |  3 ++-
 .../ActiveMQAMQPNotFoundException.java          |  7 ++---
 .../ActiveMQAMQPNotImplementedException.java    |  5 ++--
 ...iveMQAMQPResourceLimitExceededException.java | 27 ++++++++++++++++++++
 .../ActiveMQAMQPTimeoutException.java           |  3 ++-
 11 files changed, 65 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
index 13a35b1..6404c74 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQException.java
@@ -34,6 +34,16 @@ public class ActiveMQException extends Exception {
       type = ActiveMQExceptionType.GENERIC_EXCEPTION;
    }
 
+   public ActiveMQException(String msg, ActiveMQExceptionType t) {
+      super(msg);
+      type = t;
+   }
+
+   public ActiveMQException(String message, Throwable t, ActiveMQExceptionType type) {
+      super(message, t);
+      this.type = type;
+   }
+
    /*
    * This constructor is needed only for the native layer
    */

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index f135653..eb4bf5d 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -201,7 +201,8 @@ public enum ActiveMQExceptionType {
          return new ActiveMQClusterSecurityException(msg);
       }
 
-   };
+   },
+   NOT_IMPLEMTNED_EXCEPTION(213);
 
    private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/pom.xml
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/pom.xml b/artemis-protocols/artemis-proton-plug/pom.xml
index 684b3ce..b4876cf 100644
--- a/artemis-protocols/artemis-proton-plug/pom.xml
+++ b/artemis-protocols/artemis-proton-plug/pom.xml
@@ -77,6 +77,11 @@
          <version>${project.version}</version>
       </dependency>
       <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>artemis-commons</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
          <groupId>org.apache.qpid</groupId>
          <artifactId>proton-j</artifactId>
       </dependency>

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
index 6e240e3..4838d55 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPException.java
@@ -16,9 +16,11 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.Symbol;
 
-public class ActiveMQAMQPException extends Exception {
+public class ActiveMQAMQPException extends ActiveMQException {
 
    private static final String ERROR_PREFIX = "amqp:";
 
@@ -28,13 +30,13 @@ public class ActiveMQAMQPException extends Exception {
 
    private final Symbol amqpError;
 
-   public ActiveMQAMQPException(Symbol amqpError, String message, Throwable e) {
-      super(message, e);
+   public ActiveMQAMQPException(Symbol amqpError, String message, Throwable e, ActiveMQExceptionType t) {
+      super(message, e, t);
       this.amqpError = amqpError;
    }
 
-   public ActiveMQAMQPException(Symbol amqpError, String message) {
-      super(message);
+   public ActiveMQAMQPException(Symbol amqpError, String message, ActiveMQExceptionType t) {
+      super(message, t);
       this.amqpError = amqpError;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
index cdbf4fa..7818ef9 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPIllegalStateException.java
@@ -16,11 +16,12 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 
 public class ActiveMQAMQPIllegalStateException extends ActiveMQAMQPException {
 
    public ActiveMQAMQPIllegalStateException(String message) {
-      super(AmqpError.ILLEGAL_STATE, message);
+      super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.ILLEGAL_STATE);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
index e30073c..2c0b0ae 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInternalErrorException.java
@@ -16,15 +16,16 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 
 public class ActiveMQAMQPInternalErrorException extends ActiveMQAMQPException {
 
    public ActiveMQAMQPInternalErrorException(String message, Throwable e) {
-      super(AmqpError.INTERNAL_ERROR, message, e);
+      super(AmqpError.INTERNAL_ERROR, message, e, ActiveMQExceptionType.INTERNAL_ERROR);
    }
 
    public ActiveMQAMQPInternalErrorException(String message) {
-      super(AmqpError.INTERNAL_ERROR, message);
+      super(AmqpError.INTERNAL_ERROR, message, ActiveMQExceptionType.INTERNAL_ERROR);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
index c6978a2..f5dd168 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPInvalidFieldException.java
@@ -16,11 +16,12 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 
 public class ActiveMQAMQPInvalidFieldException extends ActiveMQAMQPException {
 
    public ActiveMQAMQPInvalidFieldException(String message) {
-      super(AmqpError.INVALID_FIELD, message);
+      super(AmqpError.INVALID_FIELD, message, ActiveMQExceptionType.ILLEGAL_STATE);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
index dc4c400..02cc15c 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotFoundException.java
@@ -16,15 +16,12 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 
 public class ActiveMQAMQPNotFoundException extends ActiveMQAMQPException {
 
-   public ActiveMQAMQPNotFoundException(String message, Throwable e) {
-      super(AmqpError.NOT_FOUND, message, e);
-   }
-
    public ActiveMQAMQPNotFoundException(String message) {
-      super(AmqpError.NOT_FOUND, message);
+      super(AmqpError.NOT_FOUND, message, ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
index 6a1c95c..861e236 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPNotImplementedException.java
@@ -16,11 +16,12 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 
 public class ActiveMQAMQPNotImplementedException extends ActiveMQAMQPException {
 
    public ActiveMQAMQPNotImplementedException(String message) {
-      super(AmqpError.NOT_IMPLEMENTED, message);
+      super(AmqpError.NOT_IMPLEMENTED, message, ActiveMQExceptionType.NOT_IMPLEMTNED_EXCEPTION);
    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
new file mode 100644
index 0000000..2c64a8d
--- /dev/null
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPResourceLimitExceededException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.proton.plug.exceptions;
+
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+
+public class ActiveMQAMQPResourceLimitExceededException extends ActiveMQAMQPException {
+
+   public ActiveMQAMQPResourceLimitExceededException(String message) {
+      super(AmqpError.RESOURCE_LIMIT_EXCEEDED, message, ActiveMQExceptionType.ADDRESS_FULL);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/06fb4a12/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
index 25b4ea6..c86c25d 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/exceptions/ActiveMQAMQPTimeoutException.java
@@ -16,12 +16,13 @@
  */
 package org.proton.plug.exceptions;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 
 public class ActiveMQAMQPTimeoutException extends ActiveMQAMQPException {
 
    public ActiveMQAMQPTimeoutException(String message) {
-      super(AmqpError.ILLEGAL_STATE, message);
+      super(AmqpError.ILLEGAL_STATE, message, ActiveMQExceptionType.CONNECTION_TIMEDOUT);
    }
 
 }