You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/18 00:53:22 UTC

[3/5] activemq-artemis git commit: ARTEMIS-2117 Add custom LVQ Key and Non Destructive Queue into Broker

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index b09d310..b863181 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -51,6 +51,14 @@ public class QueueQueryResult {
 
    private Boolean lastValue;
 
+   private SimpleString lastValueKey;
+
+   private Boolean nonDestructive;
+
+   private Integer consumersBeforeDispatch;
+
+   private Long delayBeforeDispatch;
+
    private Integer defaultConsumerWindowSize;
 
    public QueueQueryResult(final SimpleString name,
@@ -68,6 +76,10 @@ public class QueueQueryResult {
                            final int maxConsumers,
                            final Boolean exclusive,
                            final Boolean lastValue,
+                           final SimpleString lastValueKey,
+                           final Boolean nonDestructive,
+                           final Integer consumersBeforeDispatch,
+                           final Long delayBeforeDispatch,
                            final Integer defaultConsumerWindowSize) {
       this.durable = durable;
 
@@ -99,6 +111,14 @@ public class QueueQueryResult {
 
       this.lastValue = lastValue;
 
+      this.lastValueKey = lastValueKey;
+
+      this.nonDestructive = nonDestructive;
+
+      this.consumersBeforeDispatch = consumersBeforeDispatch;
+
+      this.delayBeforeDispatch = delayBeforeDispatch;
+
       this.defaultConsumerWindowSize = defaultConsumerWindowSize;
    }
 
@@ -166,6 +186,22 @@ public class QueueQueryResult {
       return lastValue;
    }
 
+   public SimpleString getLastValueKey() {
+      return lastValueKey;
+   }
+
+   public Boolean isNonDestructive() {
+      return nonDestructive;
+   }
+
+   public Integer getConsumersBeforeDispatch() {
+      return consumersBeforeDispatch;
+   }
+
+   public Long getDelayBeforeDispatch() {
+      return delayBeforeDispatch;
+   }
+
    public Integer getDefaultConsumerWindowSize() {
       return defaultConsumerWindowSize;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 0d86354..e25441b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executor;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ICoreMessage;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueAttributes;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -186,6 +187,19 @@ public abstract class SessionContext {
                                           Boolean exclusive,
                                           Boolean lastValue) throws ActiveMQException;
 
+   /**
+    * Creates a shared queue using the routing type set by the Address.  If the Address supports more than one type of delivery
+    * then the default delivery mode (MULTICAST) is used.
+    *
+    * @param address
+    * @param queueName
+    * @param queueAttributes
+    * @throws ActiveMQException
+    */
+   public abstract void createSharedQueue(SimpleString address,
+                                          SimpleString queueName,
+                                          QueueAttributes queueAttributes) throws ActiveMQException;
+
    public abstract void createSharedQueue(SimpleString address,
                                           SimpleString queueName,
                                           RoutingType routingType,
@@ -235,6 +249,12 @@ public abstract class SessionContext {
                                     Boolean exclusive,
                                     Boolean lastVale) throws ActiveMQException;
 
+   public abstract void createQueue(SimpleString address,
+                                    SimpleString queueName,
+                                    boolean temp,
+                                    boolean autoCreated,
+                                    QueueAttributes queueAttributes) throws ActiveMQException;
+
    public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) throws ActiveMQException;
 
    public abstract void forceDelivery(ClientConsumer consumer, long sequence) throws ActiveMQException;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index ee4223c..74f39ee 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
-import org.apache.activemq.artemis.api.core.QueueAttributes;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -61,7 +60,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
    private final SimpleString connID;
 
    private final ClientProducer clientProducer;
-   private final ClientSession clientSession;
+   private final ActiveMQSession session;
 
    private boolean disableMessageID = false;
 
@@ -78,7 +77,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
    protected ActiveMQMessageProducer(final ActiveMQConnection connection,
                                      final ClientProducer producer,
                                      final ActiveMQDestination defaultDestination,
-                                     final ClientSession clientSession,
+                                     final ActiveMQSession session,
                                      final ConnectionFactoryOptions options) throws JMSException {
       this.options = options;
       this.connection = connection;
@@ -89,7 +88,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
 
       this.defaultDestination = defaultDestination;
 
-      this.clientSession = clientSession;
+      this.session = session;
    }
 
    // MessageProducer implementation --------------------------------
@@ -388,6 +387,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       }
 
       SimpleString address = null;
+      ClientSession clientSession = session.getCoreSession();
 
       if (destination == null) {
          if (defaultDestination == null) {
@@ -413,9 +413,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
                      clientSession.createAddress(address, RoutingType.ANYCAST, true);
                      if (destination.isTemporary()) {
                         // TODO is it right to use the address for the queue name here?
-                        clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
+                        session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
                      } else {
-                        createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers(), query.isDefaultExclusive(), query.isDefaultLastValueQueue());
+                        session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
                      }
                   } else if (!destination.isQueue() && query.isAutoCreateAddresses()) {
                      clientSession.createAddress(address, RoutingType.MULTICAST, true);
@@ -428,9 +428,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
                      connection.addKnownDestination(address);
                   } else if (destination.isQueue() && query.isAutoCreateQueues()) {
                      if (destination.isTemporary()) {
-                        clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
+                        session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
                      } else {
-                        createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers(), query.isDefaultExclusive(), query.isDefaultLastValueQueue());
+                        session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
                      }
                   }
                }
@@ -450,7 +450,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       if (!(jmsMessage instanceof ActiveMQMessage)) {
          // JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
          // a message whose implementation is not one of its own.
-
          if (jmsMessage instanceof BytesMessage) {
             activeMQJmsMessage = new ActiveMQBytesMessage((BytesMessage) jmsMessage, clientSession);
          } else if (jmsMessage instanceof MapMessage) {
@@ -533,29 +532,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
    }
 
    private void checkClosed() throws JMSException {
-      if (clientProducer.isClosed() || clientSession.isClosed()) {
+      if (clientProducer.isClosed()) {
          throw new IllegalStateException("Producer is closed");
       }
-   }
-
-   private void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
-      QueueAttributes queueAttributes = destination.getQueueAttributes();
-      if (queueAttributes == null) {
-         clientSession.createQueue(destination.getSimpleAddress(), routingType, queueName, filter, durable, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
-      } else {
-         clientSession.createQueue(
-            destination.getSimpleAddress(),
-            routingType,
-            queueName,
-            filter,
-            durable,
-            autoCreated,
-            queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
-            queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
-            queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
-            queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
-         );
-      }
+      session.checkClosed();
    }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 95d3608..5b75f7f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -374,7 +374,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
                   if (jbd.isQueue() && response.isAutoCreateQueues()) {
                      // perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers)
                      session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
-                     createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+                     createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response);
                   } else if (!jbd.isQueue() && response.isAutoCreateAddresses()) {
                      session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
                   } else {
@@ -389,7 +389,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
          ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
 
-         return new ActiveMQMessageProducer(connection, producer, jbd, session, options);
+         return new ActiveMQMessageProducer(connection, producer, jbd, this, options);
       } catch (ActiveMQException e) {
          throw JMSExceptionHelper.convertFromActiveMQException(e);
       }
@@ -699,9 +699,9 @@ public class ActiveMQSession implements QueueSession, TopicSession {
          if (!(subResponse.isExists() && Objects.equals(subResponse.getAddress(), dest.getSimpleAddress()) && Objects.equals(subResponse.getFilterString(), coreFilterString))) {
             try {
                if (durability == ConsumerDurability.DURABLE) {
-                  createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+                  createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response);
                } else {
-                  createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+                  createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response);
                }
             } catch (ActiveMQQueueExistsException ignored) {
                // We ignore this because querying and then creating the queue wouldn't be idempotent
@@ -768,7 +768,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
             if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
                if (response.isAutoCreateQueues()) {
                   try {
-                     createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+                     createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);
                   } catch (ActiveMQQueueExistsException e) {
                      // The queue was created by another client/admin between the query check and send create queue packet
                   }
@@ -802,7 +802,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
                queueName = new SimpleString(UUID.randomUUID().toString());
 
-               createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+               createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
 
                consumer = session.createConsumer(queueName, null, false);
 
@@ -825,7 +825,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
                if (!subResponse.isExists()) {
                   // durable subscription queues are not technically considered to be auto-created
-                  createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+                  createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response);
                } else {
                   // Already exists
                   if (subResponse.getConsumerCount() > 0) {
@@ -856,7 +856,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
                      session.deleteQueue(queueName);
 
                      // Create the new one
-                     createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+                     createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response);
                   }
                }
 
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
          AddressQuery response = session.addressQuery(new SimpleString(activeMQDestination.getAddress()));
          if (!response.isExists()) {
             if (response.isAutoCreateQueues()) {
-               createQueue(activeMQDestination, RoutingType.ANYCAST, activeMQDestination.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+               createQueue(activeMQDestination, RoutingType.ANYCAST, activeMQDestination.getSimpleAddress(), null, true, true, response);
             } else {
                throw new InvalidDestinationException("Destination " + activeMQDestination.getName() + " does not exist");
             }
@@ -1192,14 +1192,45 @@ public class ActiveMQSession implements QueueSession, TopicSession {
 
    // Protected -----------------------------------------------------
 
-   // Private -------------------------------------------------------
 
-   private void checkClosed() throws JMSException {
+   void checkClosed() throws JMSException {
       if (session.isClosed()) {
          throw new IllegalStateException("Session is closed");
       }
    }
 
+   void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
+      QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+      setRequiredQueueAttributesIfNotSet(queueAttributes, addressQuery, routingType, filter, false);
+      session.createTemporaryQueue(
+              destination.getSimpleAddress(),
+              queueName,
+              queueAttributes
+      );
+   }
+
+   void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
+      QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+      setRequiredQueueAttributesIfNotSet(queueAttributes, addressQuery, routingType, filter, durable);
+      session.createSharedQueue(
+              destination.getSimpleAddress(),
+              queueName,
+              queueAttributes);
+   }
+
+   void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
+      QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+      setRequiredQueueAttributesIfNotSet(queueAttributes, addressQuery, routingType, filter, durable);
+      session.createQueue(
+              destination.getSimpleAddress(),
+              queueName,
+              autoCreated,
+              queueAttributes);
+   }
+
+   // Private -------------------------------------------------------
+
+
    private ActiveMQQueue lookupQueue(final String queueName, boolean isTemporary) throws ActiveMQException {
       String queueNameToUse = queueName;
       if (enable1xPrefixes) {
@@ -1246,63 +1277,34 @@ public class ActiveMQSession implements QueueSession, TopicSession {
       }
    }
 
-   private void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
-      QueueAttributes queueAttributes = destination.getQueueAttributes();
-      if (queueAttributes == null) {
-         session.createTemporaryQueue(destination.getSimpleAddress(), routingType, queueName, filter, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
-      } else {
-         session.createTemporaryQueue(
-            destination.getSimpleAddress(),
-            routingType,
-            queueName,
-            filter,
-            queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
-            queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
-            queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
-            queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
-         );
-      }
-   }
-
-   private void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
-      QueueAttributes queueAttributes = destination.getQueueAttributes();
-      if (queueAttributes == null) {
-         session.createSharedQueue(destination.getSimpleAddress(), routingType, queueName, filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
-      } else {
-         session.createSharedQueue(
-            destination.getSimpleAddress(),
-            routingType,
-            queueName,
-            filter,
-            durable,
-            queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
-            queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
-            queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
-            queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
-         );
-      }
-   }
-
-   private void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
-      QueueAttributes queueAttributes = destination.getQueueAttributes();
-      if (queueAttributes == null) {
-         session.createQueue(destination.getSimpleAddress(), routingType, queueName, filter, durable, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
-      } else {
-         session.createQueue(
-            destination.getSimpleAddress(),
-            routingType,
-            queueName,
-            filter,
-            durable,
-            autoCreated,
-            queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
-            queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
-            queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
-            queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
-         );
+   /**
+    * Set the non nullable (CreateQueueMessage_V2) queue attributes (all others get defaulted if null by address settings server side).
+    *
+    * @param queueAttributes the provided queue attributes the client wants to set
+    * @param addressQuery the address settings query information (this could be removed if max consumers and purge on no consumers were null-able in CreateQueueMessage_V2)
+    * @param routingType of the queue (multicast or anycast)
+    * @param filter to apply on the queue
+    * @param durable if queue is durable
+    */
+   private void setRequiredQueueAttributesIfNotSet(QueueAttributes queueAttributes, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) {
+      if (queueAttributes.getRoutingType() == null) {
+         queueAttributes.setRoutingType(routingType);
+      }
+      if (queueAttributes.getFilterString() == null) {
+         queueAttributes.setFilterString(filter);
+      }
+      if (queueAttributes.getDurable() == null) {
+         queueAttributes.setDurable(durable);
+      }
+      if (queueAttributes.getMaxConsumers() == null) {
+         queueAttributes.setMaxConsumers(addressQuery.getDefaultMaxConsumers());
+      }
+      if (queueAttributes.getPurgeOnNoConsumers() == null) {
+         queueAttributes.setPurgeOnNoConsumers(addressQuery.isDefaultPurgeOnNoConsumers());
       }
    }
 
+
    // Inner classes -------------------------------------------------
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
index f06772c..235d699 100644
--- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
+++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -72,7 +72,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
    public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
       SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
 
-      return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue());
+      return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index fae6ef7..e522f37 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -293,7 +293,9 @@ public class AMQConsumer {
       }
 
       boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
-
+      if (serverConsumer.getQueue().isNonDestructive()) {
+         removeReferences = false;
+      }
       if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
          removeReferences = false;
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index 87e938e..50fee8e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -39,6 +39,10 @@ public class CoreQueueConfiguration implements Serializable {
 
    private Boolean lastValue;
 
+   private String lastValueKey;
+
+   private Boolean nonDestructive;
+
    private Integer maxConsumers;
 
    private Integer consumersBeforeDispatch;
@@ -80,6 +84,14 @@ public class CoreQueueConfiguration implements Serializable {
       return lastValue;
    }
 
+   public String getLastValueKey() {
+      return lastValueKey;
+   }
+
+   public Boolean isNonDestructive() {
+      return nonDestructive;
+   }
+
    public Integer getConsumersBeforeDispatch() {
       return consumersBeforeDispatch;
    }
@@ -170,6 +182,16 @@ public class CoreQueueConfiguration implements Serializable {
       return this;
    }
 
+   public CoreQueueConfiguration setLastValueKey(String lastValueKey) {
+      this.lastValueKey = lastValueKey;
+      return this;
+   }
+
+   public CoreQueueConfiguration setNonDestructive(Boolean nonDestructive) {
+      this.nonDestructive = nonDestructive;
+      return this;
+   }
+
    public boolean getPurgeOnNoConsumers() {
       return purgeOnNoConsumers;
    }
@@ -199,6 +221,8 @@ public class CoreQueueConfiguration implements Serializable {
       result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode());
       result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
       result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
+      result = prime * result + ((lastValueKey == null) ? 0 : lastValueKey.hashCode());
+      result = prime * result + ((nonDestructive == null) ? 0 : nonDestructive.hashCode());
       result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
       result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
       result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
@@ -254,6 +278,18 @@ public class CoreQueueConfiguration implements Serializable {
       } else if (!lastValue.equals(other.lastValue)) {
          return false;
       }
+      if (lastValueKey == null) {
+         if (other.lastValueKey != null)
+            return false;
+      } else if (!lastValueKey.equals(other.lastValueKey)) {
+         return false;
+      }
+      if (nonDestructive == null) {
+         if (other.nonDestructive != null)
+            return false;
+      } else if (!nonDestructive.equals(other.nonDestructive)) {
+         return false;
+      }
       if (consumersBeforeDispatch == null) {
          if (other.consumersBeforeDispatch != null)
             return false;
@@ -287,6 +323,8 @@ public class CoreQueueConfiguration implements Serializable {
          ", purgeOnNoConsumers=" + purgeOnNoConsumers +
          ", exclusive=" + exclusive +
          ", lastValue=" + lastValue +
+         ", lastValueKey=" + lastValueKey +
+         ", nonDestructive=" + nonDestructive +
          ", consumersBeforeDispatch=" + consumersBeforeDispatch +
          ", delayBeforeDispatch=" + delayBeforeDispatch +
          "]";

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 9bc292b..5b0a3a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -181,6 +181,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String DEFAULT_LVQ_NODE_NAME = "default-last-value-queue";
 
+   private static final String DEFAULT_LVQ_KEY_NODE_NAME = "default-last-value-key";
+
+   private static final String DEFAULT_NON_DESTRUCTIVE_NODE_NAME = "default-non-destructive";
+
    private static final String DEFAULT_EXCLUSIVE_NODE_NAME = "default-exclusive-queue";
 
    private static final String DEFAULT_CONSUMERS_BEFORE_DISPATCH = "default-consumers-before-dispatch";
@@ -1000,6 +1004,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setAddressFullMessagePolicy(policy);
          } else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child));
+         } else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultLastValueKey(SimpleString.toSimpleString(getTrimmedTextContent(child)));
+         } else if (DEFAULT_NON_DESTRUCTIVE_NODE_NAME.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultNonDestructive(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_EXCLUSIVE_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setDefaultExclusiveQueue(XMLUtil.parseBoolean(child));
          } else if (MAX_DELIVERY_ATTEMPTS.equalsIgnoreCase(name)) {
@@ -1109,6 +1117,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       String user = null;
       Boolean exclusive = null;
       Boolean lastValue = null;
+      String lastValueKey = null;
+      Boolean nonDestructive = null;
       Integer consumersBeforeDispatch = null;
       Long delayBeforeDispatch = null;
 
@@ -1124,6 +1134,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             exclusive = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("last-value")) {
             lastValue = Boolean.parseBoolean(item.getNodeValue());
+         } else if (item.getNodeName().equals("last-value-key")) {
+            lastValueKey = item.getNodeValue();
+         } else if (item.getNodeName().equals("non-destructive")) {
+            nonDestructive = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("consumers-before-dispatch")) {
             consumersBeforeDispatch = Integer.parseInt(item.getNodeValue());
          } else if (item.getNodeName().equals("delay-before-dispatch")) {
@@ -1147,7 +1161,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       }
 
       return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setUser(user)
-                                         .setExclusive(exclusive).setLastValue(lastValue).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch);
+                                         .setExclusive(exclusive).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch);
    }
 
    protected CoreAddressConfiguration parseAddressConfiguration(final Node node) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index ec6b0df..716a733 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -668,6 +668,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             }
             output.append(", purgeOnNoConsumers=").append(queue.isPurgeOnNoConsumers());
             output.append(", autoCreateAddress=").append(queue.isAutoCreated());
+            output.append(", exclusive=").append(queue.isExclusive());
+            output.append(", lastValue=").append(queue.isLastValue());
+            output.append(", lastValueKey=").append(queue.getLastValueKey());
+            output.append(", nonDestructive=").append(queue.isNonDestructive());
+            output.append(", consumersBeforeDispatch=").append(queue.getConsumersBeforeDispatch());
+            output.append(", delayBeforeDispatch=").append(queue.getDelayBeforeDispatch());
+            output.append(", autoCreateAddress=").append(queue.isAutoCreated());
+
             output.append(']');
             return output;
          }
@@ -807,7 +815,21 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              boolean purgeOnNoConsumers,
                              boolean autoCreateAddress) throws Exception {
       AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address);
-      return createQueue(address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress);
+      return createQueue(
+              address,
+              routingType,
+              name,
+              filterStr,
+              durable,
+              maxConsumers,
+              purgeOnNoConsumers,
+              addressSettings.isDefaultExclusiveQueue(),
+              addressSettings.isDefaultLastValueQueue(),
+              addressSettings.getDefaultLastValueKey() == null ? null : addressSettings.getDefaultLastValueKey().toString(),
+              addressSettings.isDefaultNonDestructive(),
+              addressSettings.getDefaultConsumersBeforeDispatch(),
+              addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress
+      );
    }
 
    @Override
@@ -820,6 +842,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              boolean purgeOnNoConsumers,
                              boolean exclusive,
                              boolean lastValue,
+                             String lastValueKey,
+                             boolean nonDestructive,
                              int consumersBeforeDispatch,
                              long delayBeforeDispatch,
                              boolean autoCreateAddress) throws Exception {
@@ -833,7 +857,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
+         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
          return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
       } catch (ActiveMQException e) {
          throw new IllegalStateException(e.getMessage());
@@ -868,7 +892,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
                              String user) throws Exception {
-      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, null, user);
    }
 
    @Override
@@ -878,6 +902,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Integer maxConsumers,
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
+                             Boolean nonDestructive,
                              Integer consumersBeforeDispatch,
                              Long delayBeforeDispatch,
                              String user) throws Exception {
@@ -886,7 +911,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       clearIO();
 
       try {
-         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
+         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
          if (queue == null) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 081f7da..1534d05 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -20,6 +20,7 @@ import java.lang.ref.WeakReference;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -332,6 +333,15 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
    }
 
    @Override
+   public SimpleString getLastValueProperty() {
+      SimpleString lastValue = getMessage().getSimpleStringProperty(getQueue().getLastValueKey());
+      if (lastValue == null) {
+         lastValue = getMessage().getLastValueProperty();
+      }
+      return lastValue;
+   }
+
+   @Override
    public long getPersistentSize() {
       if (messageSize == -1) {
          try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 9d7bb7e..4caa0e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -66,6 +66,14 @@ public interface QueueBindingInfo {
 
    void setLastValue(boolean lastValue);
 
+   SimpleString getLastValueKey();
+
+   void setLastValueKey(SimpleString lastValue);
+
+   boolean isNonDestructive();
+
+   void setNonDestructive(boolean nonDestructive);
+
    int getConsumersBeforeDispatch();
 
    void setConsumersBeforeDispatch(int consumersBeforeDispatch);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 6b7b116..4eaa08d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1293,7 +1293,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
+      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
 
       readLock();
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index a7d5216..570a7fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -50,6 +50,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public boolean lastValue;
 
+   public SimpleString lastValueKey;
+
+   public boolean nonDestructive;
+
    public int consumersBeforeDispatch;
 
    public long delayBeforeDispatch;
@@ -82,6 +86,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          exclusive +
          ", lastValue=" +
          lastValue +
+         ", lastValueKey=" +
+         lastValueKey +
+         ", nonDestructive=" +
+         nonDestructive +
          ", consumersBeforeDispatch=" +
          consumersBeforeDispatch +
          ", delayBeforeDispatch=" +
@@ -102,6 +110,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
                                          final boolean purgeOnNoConsumers,
                                          final boolean exclusive,
                                          final boolean lastValue,
+                                         final SimpleString lastValueKey,
+                                         final boolean nonDestructive,
                                          final int consumersBeforeDispatch,
                                          final long delayBeforeDispatch,
                                          final byte routingType,
@@ -115,6 +125,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
       this.lastValue = lastValue;
+      this.lastValueKey = lastValueKey;
+      this.nonDestructive = nonDestructive;
       this.consumersBeforeDispatch = consumersBeforeDispatch;
       this.delayBeforeDispatch = delayBeforeDispatch;
       this.routingType = routingType;
@@ -224,6 +236,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public SimpleString getLastValueKey() {
+      return lastValueKey;
+   }
+
+   @Override
+   public void setLastValueKey(SimpleString lastValueKey) {
+      this.lastValueKey = lastValueKey;
+   }
+
+   @Override
+   public boolean isNonDestructive() {
+      return nonDestructive;
+   }
+
+   @Override
+   public void setNonDestructive(boolean nonDestructive) {
+      this.nonDestructive = nonDestructive;
+   }
+
+   @Override
    public int getConsumersBeforeDispatch() {
       return consumersBeforeDispatch;
    }
@@ -309,6 +341,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       } else {
          configurationManaged = false;
       }
+      if (buffer.readableBytes() > 0) {
+         lastValueKey = buffer.readNullableSimpleString();
+      } else {
+         lastValueKey = ActiveMQDefaultConfiguration.getDefaultLastValueKey();
+      }
+      if (buffer.readableBytes() > 0) {
+         nonDestructive = buffer.readBoolean();
+      } else {
+         nonDestructive = ActiveMQDefaultConfiguration.getDefaultNonDestructive();
+      }
    }
 
    @Override
@@ -326,6 +368,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       buffer.writeInt(consumersBeforeDispatch);
       buffer.writeLong(delayBeforeDispatch);
       buffer.writeBoolean(configurationManaged);
+      buffer.writeNullableSimpleString(lastValueKey);
+      buffer.writeBoolean(nonDestructive);
    }
 
    @Override
@@ -340,6 +384,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          DataConstants.SIZE_BOOLEAN +
          DataConstants.SIZE_INT +
          DataConstants.SIZE_LONG +
+         DataConstants.SIZE_BOOLEAN +
+         SimpleString.sizeofNullableString(lastValueKey) +
          DataConstants.SIZE_BOOLEAN;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 6ed91b4..b77e341 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -71,6 +71,7 @@ public interface PostOffice extends ActiveMQComponent {
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
+                            Boolean nonDestructive,
                             Integer consumersBeforeDispatch,
                             Long delayBeforeDispatch,
                             SimpleString user,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index ec451f7..35995eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -469,6 +469,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
+                                   Boolean nonDestructive,
                                    Integer consumersBeforeDispatch,
                                    Long delayBeforeDispatch,
                                    SimpleString user,
@@ -516,6 +517,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
             changed = true;
             queue.setExclusive(exclusive);
          }
+         if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
+            changed = true;
+            queue.setNonDestructive(nonDestructive);
+         }
          if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
             changed = true;
             queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 16a87d8..3b68d8b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -362,7 +362,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
                   requiresResponse = request.isRequiresResponse();
                   session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
-                                      request.isExclusive(), request.isLastValue(), request.isAutoCreated());
+                                      request.isExclusive(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(), request.isAutoCreated());
                   if (requiresResponse) {
                      response = createNullResponseMessage(packet);
                   }
@@ -385,7 +385,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = request.isRequiresResponse();
                   QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
                   if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
-                     session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
+                     session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
+                                               request.isExclusive(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch());
                   }
                   if (requiresResponse) {
                      response = createNullResponseMessage(packet);
@@ -432,13 +433,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                      if (!queueNames.isEmpty()) {
                         final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
                         if (convertedQueueNames != queueNames) {
-                           result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue());
+                           result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
                         }
                      }
                   }
 
                   if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
-                     response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue());
+                     response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
                   } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
                      response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses());
                   } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 488c6fd..c50315c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -403,7 +403,7 @@ public interface ActiveMQServer extends ServiceComponent {
 
    void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
                           SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue,
-                          int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
+                          SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      boolean durable, boolean temporary) throws Exception;
@@ -417,7 +417,7 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive,
-                     boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+                     boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
@@ -433,8 +433,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
-                     Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, Integer consumersBeforeDispatch,
-                     Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+                     Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
+                     Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
@@ -446,8 +446,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
-                     boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch,
-                     long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+                     boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
+                     int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
 
    @Deprecated
    Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
@@ -541,6 +541,7 @@ public interface ActiveMQServer extends ServiceComponent {
                      Integer maxConsumers,
                      Boolean purgeOnNoConsumers,
                      Boolean exclusive,
+                     Boolean nonDestructive,
                      Integer consumersBeforeDispatch,
                      Long delayBeforeDispatch,
                      String user) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
index a76812c..812f2c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
@@ -41,6 +41,14 @@ public class BindingQueryResult {
 
    private boolean defaultLastValue;
 
+   private SimpleString defaultLastValueKey;
+
+   private Boolean defaultNonDestructive;
+
+   private Integer defaultConsumersBeforeDispatch;
+
+   private Long defaultDelayBeforeDispatch;
+
    public BindingQueryResult(final boolean exists,
                              final AddressInfo addressInfo,
                              final List<SimpleString> queueNames,
@@ -49,7 +57,11 @@ public class BindingQueryResult {
                              final boolean defaultPurgeOnNoConsumers,
                              final int defaultMaxConsumers,
                              final boolean defaultExclusive,
-                             final boolean defaultLastValue) {
+                             final boolean defaultLastValue,
+                             final SimpleString defaultLastValueKey,
+                             final Boolean defaultNonDestructive,
+                             final Integer defaultConsumersBeforeDispatch,
+                             final Long defaultDelayBeforeDispatch) {
       this.addressInfo = addressInfo;
 
       this.exists = exists;
@@ -67,6 +79,14 @@ public class BindingQueryResult {
       this.defaultExclusive = defaultExclusive;
 
       this.defaultLastValue = defaultLastValue;
+
+      this.defaultLastValueKey = defaultLastValueKey;
+
+      this.defaultNonDestructive = defaultNonDestructive;
+
+      this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch;
+
+      this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch;
    }
 
    public boolean isExists() {
@@ -104,4 +124,20 @@ public class BindingQueryResult {
    public boolean isDefaultLastValue() {
       return defaultLastValue;
    }
+
+   public SimpleString getDefaultLastValueKey() {
+      return defaultLastValueKey;
+   }
+
+   public Boolean isDefaultNonDestructive() {
+      return defaultNonDestructive;
+   }
+
+   public Integer getDefaultConsumersBeforeDispatch() {
+      return defaultConsumersBeforeDispatch;
+   }
+
+   public Long getDefaultDelayBeforeDispatch() {
+      return defaultDelayBeforeDispatch;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 48a589f..2e2fb8d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,6 +42,8 @@ public interface MessageReference {
 
    long getMessageID();
 
+   SimpleString getLastValueProperty();
+
    /**
     * We define this method aggregation here because on paging we need to hold the original estimate,
     * so we need to perform some extra steps on paging.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index a8f1095..f2fd8f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -90,6 +90,12 @@ public interface Queue extends Bindable,CriticalComponent {
 
    boolean isLastValue();
 
+   SimpleString getLastValueKey();
+
+   boolean isNonDestructive();
+
+   void setNonDestructive(boolean nonDestructive);
+
    int getMaxConsumers();
 
    void setMaxConsumer(int maxConsumers);
@@ -104,7 +110,7 @@ public interface Queue extends Bindable,CriticalComponent {
 
    int getConsumerCount();
 
-   /**
+    /**
     * This will set a reference counter for every consumer present on the queue.
     * The ReferenceCounter will know what to do when the counter became zeroed.
     * This is used to control what to do with temporary queues, especially
@@ -227,7 +233,7 @@ public interface Queue extends Bindable,CriticalComponent {
    int deleteMatchingReferences(Filter filter) throws Exception;
 
    default int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
-      return deleteMatchingReferences(flushLImit, filter, AckReason.NORMAL);
+      return deleteMatchingReferences(flushLImit, filter, AckReason.KILLED);
    }
 
    int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index c79114d..e682891 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -43,6 +43,8 @@ public final class QueueConfig {
    private final int consumersBeforeDispatch;
    private final long delayBeforeDispatch;
    private final boolean configurationManaged;
+   private final SimpleString lastValueKey;
+   private final boolean nonDestructive;
 
    public static final class Builder {
 
@@ -59,6 +61,8 @@ public final class QueueConfig {
       private int maxConsumers;
       private boolean exclusive;
       private boolean lastValue;
+      private SimpleString lastValueKey;
+      private boolean nonDestructive;
       private boolean purgeOnNoConsumers;
       private int consumersBeforeDispatch;
       private long delayBeforeDispatch;
@@ -82,6 +86,8 @@ public final class QueueConfig {
          this.maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
          this.exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
          this.lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
+         this.lastValueKey = ActiveMQDefaultConfiguration.getDefaultLastValueKey();
+         this.nonDestructive = ActiveMQDefaultConfiguration.getDefaultNonDestructive();
          this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
          this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
          this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
@@ -152,6 +158,16 @@ public final class QueueConfig {
          return this;
       }
 
+      public Builder lastValueKey(SimpleString lastValueKey) {
+         this.lastValueKey = lastValueKey;
+         return this;
+      }
+
+      public Builder nonDestructive(boolean nonDestructive) {
+         this.nonDestructive = nonDestructive;
+         return this;
+      }
+
       public Builder consumersBeforeDispatch(final int consumersBeforeDispatch) {
          this.consumersBeforeDispatch = consumersBeforeDispatch;
          return this;
@@ -193,7 +209,7 @@ public final class QueueConfig {
          } else {
             pageSubscription = null;
          }
-         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
+         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
       }
 
    }
@@ -239,6 +255,8 @@ public final class QueueConfig {
                        final int maxConsumers,
                        final boolean exclusive,
                        final boolean lastValue,
+                       final SimpleString lastValueKey,
+                       final boolean nonDestructive,
                        final int consumersBeforeDispatch,
                        final long delayBeforeDispatch,
                        final boolean purgeOnNoConsumers,
@@ -256,6 +274,8 @@ public final class QueueConfig {
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
       this.lastValue = lastValue;
+      this.lastValueKey = lastValueKey;
+      this.nonDestructive = nonDestructive;
       this.maxConsumers = maxConsumers;
       this.consumersBeforeDispatch = consumersBeforeDispatch;
       this.delayBeforeDispatch = delayBeforeDispatch;
@@ -314,6 +334,14 @@ public final class QueueConfig {
       return lastValue;
    }
 
+   public SimpleString lastValueKey() {
+      return lastValueKey;
+   }
+
+   public boolean isNonDestructive() {
+      return nonDestructive;
+   }
+
    public RoutingType deliveryMode() {
       return routingType;
    }
@@ -363,6 +391,10 @@ public final class QueueConfig {
          return false;
       if (lastValue != that.lastValue)
          return false;
+      if (lastValueKey != null ? !lastValueKey.equals(that.lastValueKey) : that.lastValueKey != null)
+         return false;
+      if (nonDestructive != that.nonDestructive)
+         return false;
       if (purgeOnNoConsumers != that.purgeOnNoConsumers)
          return false;
       if (consumersBeforeDispatch != that.consumersBeforeDispatch)
@@ -392,6 +424,8 @@ public final class QueueConfig {
       result = 31 * result + maxConsumers;
       result = 31 * result + (exclusive ? 1 : 0);
       result = 31 * result + (lastValue ? 1 : 0);
+      result = 31 * result + (lastValueKey != null ? lastValueKey.hashCode() : 0);
+      result = 31 * result + (nonDestructive ? 1 : 0);
       result = 31 * result + consumersBeforeDispatch;
       result = 31 * result + Long.hashCode(delayBeforeDispatch);
       result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
@@ -415,6 +449,8 @@ public final class QueueConfig {
          + ", maxConsumers=" + maxConsumers
          + ", exclusive=" + exclusive
          + ", lastValue=" + lastValue
+         + ", lastValueKey=" + lastValueKey
+         + ", nonDestructive=" + nonDestructive
          + ", consumersBeforeDispatch=" + consumersBeforeDispatch
          + ", delayBeforeDispatch=" + delayBeforeDispatch
          + ", purgeOnNoConsumers=" + purgeOnNoConsumers

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 6d72088..37442b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -167,6 +167,22 @@ public interface ServerSession extends SecurityAuth {
                      SimpleString filterString,
                      boolean temporary,
                      boolean durable,
+                     int maxConsumers,
+                     boolean purgeOnNoConsumers,
+                     Boolean exclusive,
+                     Boolean lastValue,
+                     SimpleString lastValueKey,
+                     Boolean nonDestructive,
+                     Integer consumersBeforeDispatch,
+                     Long delayBeforeDispatch,
+                     boolean autoCreated) throws Exception;
+
+   Queue createQueue(SimpleString address,
+                     SimpleString name,
+                     RoutingType routingType,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable,
                      boolean autoCreated) throws Exception;
 
    Queue createQueue(AddressInfo addressInfo,
@@ -289,6 +305,20 @@ public interface ServerSession extends SecurityAuth {
    void createSharedQueue(SimpleString address,
                           SimpleString name,
                           RoutingType routingType,
+                          SimpleString filterString,
+                          boolean durable,
+                          Integer maxConsumers,
+                          Boolean purgeOnNoConsumers,
+                          Boolean exclusive,
+                          Boolean lastValue,
+                          SimpleString lastValueKey,
+                          Boolean nonDestructive,
+                          Integer consumersBeforeDispatch,
+                          Long delayBeforeDispatch) throws Exception;
+
+   void createSharedQueue(SimpleString address,
+                          SimpleString name,
+                          RoutingType routingType,
                           boolean durable,
                           SimpleString filterString) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
index 06b3d85..164f141 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
@@ -18,5 +18,5 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 public enum AckReason {
-   KILLED, EXPIRED, NORMAL
+   KILLED, EXPIRED, NORMAL, REPLACED
 }
\ No newline at end of file