You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/10/18 00:53:22 UTC
[3/5] activemq-artemis git commit: ARTEMIS-2117 Add custom LVQ Key
and Non Destructive Queue into Broker
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index b09d310..b863181 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -51,6 +51,14 @@ public class QueueQueryResult {
private Boolean lastValue;
+ private SimpleString lastValueKey;
+
+ private Boolean nonDestructive;
+
+ private Integer consumersBeforeDispatch;
+
+ private Long delayBeforeDispatch;
+
private Integer defaultConsumerWindowSize;
public QueueQueryResult(final SimpleString name,
@@ -68,6 +76,10 @@ public class QueueQueryResult {
final int maxConsumers,
final Boolean exclusive,
final Boolean lastValue,
+ final SimpleString lastValueKey,
+ final Boolean nonDestructive,
+ final Integer consumersBeforeDispatch,
+ final Long delayBeforeDispatch,
final Integer defaultConsumerWindowSize) {
this.durable = durable;
@@ -99,6 +111,14 @@ public class QueueQueryResult {
this.lastValue = lastValue;
+ this.lastValueKey = lastValueKey;
+
+ this.nonDestructive = nonDestructive;
+
+ this.consumersBeforeDispatch = consumersBeforeDispatch;
+
+ this.delayBeforeDispatch = delayBeforeDispatch;
+
this.defaultConsumerWindowSize = defaultConsumerWindowSize;
}
@@ -166,6 +186,22 @@ public class QueueQueryResult {
return lastValue;
}
+ public SimpleString getLastValueKey() {
+ return lastValueKey;
+ }
+
+ public Boolean isNonDestructive() {
+ return nonDestructive;
+ }
+
+ public Integer getConsumersBeforeDispatch() {
+ return consumersBeforeDispatch;
+ }
+
+ public Long getDelayBeforeDispatch() {
+ return delayBeforeDispatch;
+ }
+
public Integer getDefaultConsumerWindowSize() {
return defaultConsumerWindowSize;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
index 0d86354..e25441b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
@@ -186,6 +187,19 @@ public abstract class SessionContext {
Boolean exclusive,
Boolean lastValue) throws ActiveMQException;
+ /**
+ * Creates a shared queue using the routing type set by the Address. If the Address supports more than one type of delivery
+ * then the default delivery mode (MULTICAST) is used.
+ *
+ * @param address
+ * @param queueName
+ * @param queueAttributes
+ * @throws ActiveMQException
+ */
+ public abstract void createSharedQueue(SimpleString address,
+ SimpleString queueName,
+ QueueAttributes queueAttributes) throws ActiveMQException;
+
public abstract void createSharedQueue(SimpleString address,
SimpleString queueName,
RoutingType routingType,
@@ -235,6 +249,12 @@ public abstract class SessionContext {
Boolean exclusive,
Boolean lastVale) throws ActiveMQException;
+ public abstract void createQueue(SimpleString address,
+ SimpleString queueName,
+ boolean temp,
+ boolean autoCreated,
+ QueueAttributes queueAttributes) throws ActiveMQException;
+
public abstract ClientSession.QueueQuery queueQuery(SimpleString queueName) throws ActiveMQException;
public abstract void forceDelivery(ClientConsumer consumer, long sequence) throws ActiveMQException;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index ee4223c..74f39ee 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
-import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
@@ -61,7 +60,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
private final SimpleString connID;
private final ClientProducer clientProducer;
- private final ClientSession clientSession;
+ private final ActiveMQSession session;
private boolean disableMessageID = false;
@@ -78,7 +77,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
protected ActiveMQMessageProducer(final ActiveMQConnection connection,
final ClientProducer producer,
final ActiveMQDestination defaultDestination,
- final ClientSession clientSession,
+ final ActiveMQSession session,
final ConnectionFactoryOptions options) throws JMSException {
this.options = options;
this.connection = connection;
@@ -89,7 +88,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
this.defaultDestination = defaultDestination;
- this.clientSession = clientSession;
+ this.session = session;
}
// MessageProducer implementation --------------------------------
@@ -388,6 +387,7 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
}
SimpleString address = null;
+ ClientSession clientSession = session.getCoreSession();
if (destination == null) {
if (defaultDestination == null) {
@@ -413,9 +413,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
clientSession.createAddress(address, RoutingType.ANYCAST, true);
if (destination.isTemporary()) {
// TODO is it right to use the address for the queue name here?
- clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
+ session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
} else {
- createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers(), query.isDefaultExclusive(), query.isDefaultLastValueQueue());
+ session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
}
} else if (!destination.isQueue() && query.isAutoCreateAddresses()) {
clientSession.createAddress(address, RoutingType.MULTICAST, true);
@@ -428,9 +428,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
connection.addKnownDestination(address);
} else if (destination.isQueue() && query.isAutoCreateQueues()) {
if (destination.isTemporary()) {
- clientSession.createTemporaryQueue(address, RoutingType.ANYCAST, address);
+ session.createTemporaryQueue(destination, RoutingType.ANYCAST, address, null, query);
} else {
- createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query.getDefaultMaxConsumers(), query.isDefaultPurgeOnNoConsumers(), query.isDefaultExclusive(), query.isDefaultLastValueQueue());
+ session.createQueue(destination, RoutingType.ANYCAST, address, null, true, true, query);
}
}
}
@@ -450,7 +450,6 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
if (!(jmsMessage instanceof ActiveMQMessage)) {
// JMS 1.1 Sect. 3.11.4: A provider must be prepared to accept, from a client,
// a message whose implementation is not one of its own.
-
if (jmsMessage instanceof BytesMessage) {
activeMQJmsMessage = new ActiveMQBytesMessage((BytesMessage) jmsMessage, clientSession);
} else if (jmsMessage instanceof MapMessage) {
@@ -533,29 +532,10 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
}
private void checkClosed() throws JMSException {
- if (clientProducer.isClosed() || clientSession.isClosed()) {
+ if (clientProducer.isClosed()) {
throw new IllegalStateException("Producer is closed");
}
- }
-
- private void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
- QueueAttributes queueAttributes = destination.getQueueAttributes();
- if (queueAttributes == null) {
- clientSession.createQueue(destination.getSimpleAddress(), routingType, queueName, filter, durable, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
- } else {
- clientSession.createQueue(
- destination.getSimpleAddress(),
- routingType,
- queueName,
- filter,
- durable,
- autoCreated,
- queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
- queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
- queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
- queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
- );
- }
+ session.checkClosed();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index 95d3608..5b75f7f 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -374,7 +374,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (jbd.isQueue() && response.isAutoCreateQueues()) {
// perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers)
session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true);
- createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response);
} else if (!jbd.isQueue() && response.isAutoCreateAddresses()) {
session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true);
} else {
@@ -389,7 +389,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
- return new ActiveMQMessageProducer(connection, producer, jbd, session, options);
+ return new ActiveMQMessageProducer(connection, producer, jbd, this, options);
} catch (ActiveMQException e) {
throw JMSExceptionHelper.convertFromActiveMQException(e);
}
@@ -699,9 +699,9 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (!(subResponse.isExists() && Objects.equals(subResponse.getAddress(), dest.getSimpleAddress()) && Objects.equals(subResponse.getFilterString(), coreFilterString))) {
try {
if (durability == ConsumerDurability.DURABLE) {
- createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response);
} else {
- createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response);
}
} catch (ActiveMQQueueExistsException ignored) {
// We ignore this because querying and then creating the queue wouldn't be idempotent
@@ -768,7 +768,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) {
if (response.isAutoCreateQueues()) {
try {
- createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response);
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
}
@@ -802,7 +802,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
queueName = new SimpleString(UUID.randomUUID().toString());
- createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
consumer = session.createConsumer(queueName, null, false);
@@ -825,7 +825,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
if (!subResponse.isExists()) {
// durable subscription queues are not technically considered to be auto-created
- createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response);
} else {
// Already exists
if (subResponse.getConsumerCount() > 0) {
@@ -856,7 +856,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
session.deleteQueue(queueName);
// Create the new one
- createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, false, response);
}
}
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
AddressQuery response = session.addressQuery(new SimpleString(activeMQDestination.getAddress()));
if (!response.isExists()) {
if (response.isAutoCreateQueues()) {
- createQueue(activeMQDestination, RoutingType.ANYCAST, activeMQDestination.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue());
+ createQueue(activeMQDestination, RoutingType.ANYCAST, activeMQDestination.getSimpleAddress(), null, true, true, response);
} else {
throw new InvalidDestinationException("Destination " + activeMQDestination.getName() + " does not exist");
}
@@ -1192,14 +1192,45 @@ public class ActiveMQSession implements QueueSession, TopicSession {
// Protected -----------------------------------------------------
- // Private -------------------------------------------------------
- private void checkClosed() throws JMSException {
+ void checkClosed() throws JMSException {
if (session.isClosed()) {
throw new IllegalStateException("Session is closed");
}
}
+ void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
+ QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+ setRequiredQueueAttributesIfNotSet(queueAttributes, addressQuery, routingType, filter, false);
+ session.createTemporaryQueue(
+ destination.getSimpleAddress(),
+ queueName,
+ queueAttributes
+ );
+ }
+
+ void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
+ QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+ setRequiredQueueAttributesIfNotSet(queueAttributes, addressQuery, routingType, filter, durable);
+ session.createSharedQueue(
+ destination.getSimpleAddress(),
+ queueName,
+ queueAttributes);
+ }
+
+ void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, ClientSession.AddressQuery addressQuery) throws ActiveMQException {
+ QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
+ setRequiredQueueAttributesIfNotSet(queueAttributes, addressQuery, routingType, filter, durable);
+ session.createQueue(
+ destination.getSimpleAddress(),
+ queueName,
+ autoCreated,
+ queueAttributes);
+ }
+
+ // Private -------------------------------------------------------
+
+
private ActiveMQQueue lookupQueue(final String queueName, boolean isTemporary) throws ActiveMQException {
String queueNameToUse = queueName;
if (enable1xPrefixes) {
@@ -1246,63 +1277,34 @@ public class ActiveMQSession implements QueueSession, TopicSession {
}
}
- private void createTemporaryQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
- QueueAttributes queueAttributes = destination.getQueueAttributes();
- if (queueAttributes == null) {
- session.createTemporaryQueue(destination.getSimpleAddress(), routingType, queueName, filter, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
- } else {
- session.createTemporaryQueue(
- destination.getSimpleAddress(),
- routingType,
- queueName,
- filter,
- queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
- queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
- queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
- queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
- );
- }
- }
-
- private void createSharedQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
- QueueAttributes queueAttributes = destination.getQueueAttributes();
- if (queueAttributes == null) {
- session.createSharedQueue(destination.getSimpleAddress(), routingType, queueName, filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
- } else {
- session.createSharedQueue(
- destination.getSimpleAddress(),
- routingType,
- queueName,
- filter,
- durable,
- queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
- queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
- queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
- queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
- );
- }
- }
-
- private void createQueue(ActiveMQDestination destination, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue) throws ActiveMQException {
- QueueAttributes queueAttributes = destination.getQueueAttributes();
- if (queueAttributes == null) {
- session.createQueue(destination.getSimpleAddress(), routingType, queueName, filter, durable, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue);
- } else {
- session.createQueue(
- destination.getSimpleAddress(),
- routingType,
- queueName,
- filter,
- durable,
- autoCreated,
- queueAttributes.getMaxConsumers() == null ? maxConsumers : queueAttributes.getMaxConsumers(),
- queueAttributes.getPurgeOnNoConsumers() == null ? purgeOnNoConsumers : queueAttributes.getPurgeOnNoConsumers(),
- queueAttributes.getExclusive() == null ? exclusive : queueAttributes.getExclusive(),
- queueAttributes.getLastValue() == null ? lastValue : queueAttributes.getLastValue()
- );
+ /**
+ * Set the non nullable (CreateQueueMessage_V2) queue attributes (all others get defaulted if null by address settings server side).
+ *
+ * @param queueAttributes the provided queue attributes the client wants to set
+ * @param addressQuery the address settings query information (this could be removed if max consumers and purge on no consumers were null-able in CreateQueueMessage_V2)
+ * @param routingType of the queue (multicast or anycast)
+ * @param filter to apply on the queue
+ * @param durable if queue is durable
+ */
+ private void setRequiredQueueAttributesIfNotSet(QueueAttributes queueAttributes, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) {
+ if (queueAttributes.getRoutingType() == null) {
+ queueAttributes.setRoutingType(routingType);
+ }
+ if (queueAttributes.getFilterString() == null) {
+ queueAttributes.setFilterString(filter);
+ }
+ if (queueAttributes.getDurable() == null) {
+ queueAttributes.setDurable(durable);
+ }
+ if (queueAttributes.getMaxConsumers() == null) {
+ queueAttributes.setMaxConsumers(addressQuery.getDefaultMaxConsumers());
+ }
+ if (queueAttributes.getPurgeOnNoConsumers() == null) {
+ queueAttributes.setPurgeOnNoConsumers(addressQuery.isDefaultPurgeOnNoConsumers());
}
}
+
// Inner classes -------------------------------------------------
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
index f06772c..235d699 100644
--- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
+++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientSessionContext.java
@@ -72,7 +72,7 @@ public class HornetQClientSessionContext extends ActiveMQSessionContext {
public ClientSession.AddressQuery addressQuery(final SimpleString address) throws ActiveMQException {
SessionBindingQueryResponseMessage response = (SessionBindingQueryResponseMessage) getSessionChannel().sendBlocking(new SessionBindingQueryMessage(address), PacketImpl.SESS_BINDINGQUERY_RESP);
- return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue());
+ return new AddressQueryImpl(response.isExists(), response.getQueueNames(), false, false, ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultLastValue(), ActiveMQDefaultConfiguration.getDefaultLastValueKey(), ActiveMQDefaultConfiguration.getDefaultNonDestructive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch());
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index fae6ef7..e522f37 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -293,7 +293,9 @@ public class AMQConsumer {
}
boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
-
+ if (serverConsumer.getQueue().isNonDestructive()) {
+ removeReferences = false;
+ }
if (ack.isRedeliveredAck() || ack.isDeliveredAck() || ack.isExpiredAck()) {
removeReferences = false;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index 87e938e..50fee8e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -39,6 +39,10 @@ public class CoreQueueConfiguration implements Serializable {
private Boolean lastValue;
+ private String lastValueKey;
+
+ private Boolean nonDestructive;
+
private Integer maxConsumers;
private Integer consumersBeforeDispatch;
@@ -80,6 +84,14 @@ public class CoreQueueConfiguration implements Serializable {
return lastValue;
}
+ public String getLastValueKey() {
+ return lastValueKey;
+ }
+
+ public Boolean isNonDestructive() {
+ return nonDestructive;
+ }
+
public Integer getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}
@@ -170,6 +182,16 @@ public class CoreQueueConfiguration implements Serializable {
return this;
}
+ public CoreQueueConfiguration setLastValueKey(String lastValueKey) {
+ this.lastValueKey = lastValueKey;
+ return this;
+ }
+
+ public CoreQueueConfiguration setNonDestructive(Boolean nonDestructive) {
+ this.nonDestructive = nonDestructive;
+ return this;
+ }
+
public boolean getPurgeOnNoConsumers() {
return purgeOnNoConsumers;
}
@@ -199,6 +221,8 @@ public class CoreQueueConfiguration implements Serializable {
result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode());
result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
+ result = prime * result + ((lastValueKey == null) ? 0 : lastValueKey.hashCode());
+ result = prime * result + ((nonDestructive == null) ? 0 : nonDestructive.hashCode());
result = prime * result + ((consumersBeforeDispatch == null) ? 0 : consumersBeforeDispatch.hashCode());
result = prime * result + ((delayBeforeDispatch == null) ? 0 : delayBeforeDispatch.hashCode());
result = prime * result + ((routingType == null) ? 0 : routingType.hashCode());
@@ -254,6 +278,18 @@ public class CoreQueueConfiguration implements Serializable {
} else if (!lastValue.equals(other.lastValue)) {
return false;
}
+ if (lastValueKey == null) {
+ if (other.lastValueKey != null)
+ return false;
+ } else if (!lastValueKey.equals(other.lastValueKey)) {
+ return false;
+ }
+ if (nonDestructive == null) {
+ if (other.nonDestructive != null)
+ return false;
+ } else if (!nonDestructive.equals(other.nonDestructive)) {
+ return false;
+ }
if (consumersBeforeDispatch == null) {
if (other.consumersBeforeDispatch != null)
return false;
@@ -287,6 +323,8 @@ public class CoreQueueConfiguration implements Serializable {
", purgeOnNoConsumers=" + purgeOnNoConsumers +
", exclusive=" + exclusive +
", lastValue=" + lastValue +
+ ", lastValueKey=" + lastValueKey +
+ ", nonDestructive=" + nonDestructive +
", consumersBeforeDispatch=" + consumersBeforeDispatch +
", delayBeforeDispatch=" + delayBeforeDispatch +
"]";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 9bc292b..5b0a3a4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -181,6 +181,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String DEFAULT_LVQ_NODE_NAME = "default-last-value-queue";
+ private static final String DEFAULT_LVQ_KEY_NODE_NAME = "default-last-value-key";
+
+ private static final String DEFAULT_NON_DESTRUCTIVE_NODE_NAME = "default-non-destructive";
+
private static final String DEFAULT_EXCLUSIVE_NODE_NAME = "default-exclusive-queue";
private static final String DEFAULT_CONSUMERS_BEFORE_DISPATCH = "default-consumers-before-dispatch";
@@ -1000,6 +1004,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
addressSettings.setAddressFullMessagePolicy(policy);
} else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDefaultLastValueQueue(XMLUtil.parseBoolean(child));
+ } else if (DEFAULT_LVQ_KEY_NODE_NAME.equalsIgnoreCase(name)) {
+ addressSettings.setDefaultLastValueKey(SimpleString.toSimpleString(getTrimmedTextContent(child)));
+ } else if (DEFAULT_NON_DESTRUCTIVE_NODE_NAME.equalsIgnoreCase(name)) {
+ addressSettings.setDefaultNonDestructive(XMLUtil.parseBoolean(child));
} else if (DEFAULT_EXCLUSIVE_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setDefaultExclusiveQueue(XMLUtil.parseBoolean(child));
} else if (MAX_DELIVERY_ATTEMPTS.equalsIgnoreCase(name)) {
@@ -1109,6 +1117,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String user = null;
Boolean exclusive = null;
Boolean lastValue = null;
+ String lastValueKey = null;
+ Boolean nonDestructive = null;
Integer consumersBeforeDispatch = null;
Long delayBeforeDispatch = null;
@@ -1124,6 +1134,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
exclusive = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("last-value")) {
lastValue = Boolean.parseBoolean(item.getNodeValue());
+ } else if (item.getNodeName().equals("last-value-key")) {
+ lastValueKey = item.getNodeValue();
+ } else if (item.getNodeName().equals("non-destructive")) {
+ nonDestructive = Boolean.parseBoolean(item.getNodeValue());
} else if (item.getNodeName().equals("consumers-before-dispatch")) {
consumersBeforeDispatch = Integer.parseInt(item.getNodeValue());
} else if (item.getNodeName().equals("delay-before-dispatch")) {
@@ -1147,7 +1161,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
}
return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setUser(user)
- .setExclusive(exclusive).setLastValue(lastValue).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch);
+ .setExclusive(exclusive).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch);
}
protected CoreAddressConfiguration parseAddressConfiguration(final Node node) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index ec6b0df..716a733 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -668,6 +668,14 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
}
output.append(", purgeOnNoConsumers=").append(queue.isPurgeOnNoConsumers());
output.append(", autoCreateAddress=").append(queue.isAutoCreated());
+ output.append(", exclusive=").append(queue.isExclusive());
+ output.append(", lastValue=").append(queue.isLastValue());
+ output.append(", lastValueKey=").append(queue.getLastValueKey());
+ output.append(", nonDestructive=").append(queue.isNonDestructive());
+ output.append(", consumersBeforeDispatch=").append(queue.getConsumersBeforeDispatch());
+ output.append(", delayBeforeDispatch=").append(queue.getDelayBeforeDispatch());
+ output.append(", autoCreateAddress=").append(queue.isAutoCreated());
+
output.append(']');
return output;
}
@@ -807,7 +815,21 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
boolean purgeOnNoConsumers,
boolean autoCreateAddress) throws Exception {
AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address);
- return createQueue(address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress);
+ return createQueue(
+ address,
+ routingType,
+ name,
+ filterStr,
+ durable,
+ maxConsumers,
+ purgeOnNoConsumers,
+ addressSettings.isDefaultExclusiveQueue(),
+ addressSettings.isDefaultLastValueQueue(),
+ addressSettings.getDefaultLastValueKey() == null ? null : addressSettings.getDefaultLastValueKey().toString(),
+ addressSettings.isDefaultNonDestructive(),
+ addressSettings.getDefaultConsumersBeforeDispatch(),
+ addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress
+ );
}
@Override
@@ -820,6 +842,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
boolean purgeOnNoConsumers,
boolean exclusive,
boolean lastValue,
+ String lastValueKey,
+ boolean nonDestructive,
int consumersBeforeDispatch,
long delayBeforeDispatch,
boolean autoCreateAddress) throws Exception {
@@ -833,7 +857,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
filter = new SimpleString(filterStr);
}
- final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
+ final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
@@ -868,7 +892,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
Boolean purgeOnNoConsumers,
Boolean exclusive,
String user) throws Exception {
- return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user);
+ return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, null, user);
}
@Override
@@ -878,6 +902,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
+ Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
String user) throws Exception {
@@ -886,7 +911,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
clearIO();
try {
- final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user);
+ final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
if (queue == null) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 081f7da..1534d05 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -20,6 +20,7 @@ import java.lang.ref.WeakReference;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -332,6 +333,15 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
}
@Override
+ public SimpleString getLastValueProperty() {
+ SimpleString lastValue = getMessage().getSimpleStringProperty(getQueue().getLastValueKey());
+ if (lastValue == null) {
+ lastValue = getMessage().getLastValueProperty();
+ }
+ return lastValue;
+ }
+
+ @Override
public long getPersistentSize() {
if (messageSize == -1) {
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 9d7bb7e..4caa0e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -66,6 +66,14 @@ public interface QueueBindingInfo {
void setLastValue(boolean lastValue);
+ SimpleString getLastValueKey();
+
+ void setLastValueKey(SimpleString lastValue);
+
+ boolean isNonDestructive();
+
+ void setNonDestructive(boolean nonDestructive);
+
int getConsumersBeforeDispatch();
void setConsumersBeforeDispatch(int consumersBeforeDispatch);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 6b7b116..4eaa08d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1293,7 +1293,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
SimpleString filterString = filter == null ? null : filter.getFilterString();
- PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
+ PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
readLock();
try {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index a7d5216..570a7fc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -50,6 +50,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
public boolean lastValue;
+ public SimpleString lastValueKey;
+
+ public boolean nonDestructive;
+
public int consumersBeforeDispatch;
public long delayBeforeDispatch;
@@ -82,6 +86,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
exclusive +
", lastValue=" +
lastValue +
+ ", lastValueKey=" +
+ lastValueKey +
+ ", nonDestructive=" +
+ nonDestructive +
", consumersBeforeDispatch=" +
consumersBeforeDispatch +
", delayBeforeDispatch=" +
@@ -102,6 +110,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
final boolean purgeOnNoConsumers,
final boolean exclusive,
final boolean lastValue,
+ final SimpleString lastValueKey,
+ final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final byte routingType,
@@ -115,6 +125,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.exclusive = exclusive;
this.lastValue = lastValue;
+ this.lastValueKey = lastValueKey;
+ this.nonDestructive = nonDestructive;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
this.routingType = routingType;
@@ -224,6 +236,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
}
@Override
+ public SimpleString getLastValueKey() {
+ return lastValueKey;
+ }
+
+ @Override
+ public void setLastValueKey(SimpleString lastValueKey) {
+ this.lastValueKey = lastValueKey;
+ }
+
+ @Override
+ public boolean isNonDestructive() {
+ return nonDestructive;
+ }
+
+ @Override
+ public void setNonDestructive(boolean nonDestructive) {
+ this.nonDestructive = nonDestructive;
+ }
+
+ @Override
public int getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}
@@ -309,6 +341,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
} else {
configurationManaged = false;
}
+ if (buffer.readableBytes() > 0) {
+ lastValueKey = buffer.readNullableSimpleString();
+ } else {
+ lastValueKey = ActiveMQDefaultConfiguration.getDefaultLastValueKey();
+ }
+ if (buffer.readableBytes() > 0) {
+ nonDestructive = buffer.readBoolean();
+ } else {
+ nonDestructive = ActiveMQDefaultConfiguration.getDefaultNonDestructive();
+ }
}
@Override
@@ -326,6 +368,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
buffer.writeInt(consumersBeforeDispatch);
buffer.writeLong(delayBeforeDispatch);
buffer.writeBoolean(configurationManaged);
+ buffer.writeNullableSimpleString(lastValueKey);
+ buffer.writeBoolean(nonDestructive);
}
@Override
@@ -340,6 +384,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
DataConstants.SIZE_BOOLEAN +
DataConstants.SIZE_INT +
DataConstants.SIZE_LONG +
+ DataConstants.SIZE_BOOLEAN +
+ SimpleString.sizeofNullableString(lastValueKey) +
DataConstants.SIZE_BOOLEAN;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 6ed91b4..b77e341 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -71,6 +71,7 @@ public interface PostOffice extends ActiveMQComponent {
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
+ Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
SimpleString user,
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index ec451f7..35995eb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -469,6 +469,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
+ Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
SimpleString user,
@@ -516,6 +517,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
changed = true;
queue.setExclusive(exclusive);
}
+ if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
+ changed = true;
+ queue.setNonDestructive(nonDestructive);
+ }
if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) {
changed = true;
queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 16a87d8..3b68d8b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -362,7 +362,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
requiresResponse = request.isRequiresResponse();
session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
- request.isExclusive(), request.isLastValue(), request.isAutoCreated());
+ request.isExclusive(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(), request.isAutoCreated());
if (requiresResponse) {
response = createNullResponseMessage(packet);
}
@@ -385,7 +385,8 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = request.isRequiresResponse();
QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
- session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(), request.isExclusive(), request.isLastValue());
+ session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
+ request.isExclusive(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch());
}
if (requiresResponse) {
response = createNullResponseMessage(packet);
@@ -432,13 +433,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
if (!queueNames.isEmpty()) {
final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
if (convertedQueueNames != queueNames) {
- result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue());
+ result = new BindingQueryResult(result.isExists(), result.getAddressInfo(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
}
}
}
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
- response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue());
+ response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers(), result.isDefaultExclusive(), result.isDefaultLastValue(), result.getDefaultLastValueKey(), result.isDefaultNonDestructive(), result.getDefaultConsumersBeforeDispatch(), result.getDefaultDelayBeforeDispatch());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {
response = new SessionBindingQueryResponseMessage_V3(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V2)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 488c6fd..c50315c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -403,7 +403,7 @@ public interface ActiveMQServer extends ServiceComponent {
void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue,
- int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
+ SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean temporary) throws Exception;
@@ -417,7 +417,7 @@ public interface ActiveMQServer extends ServiceComponent {
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive,
- boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+ boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
@@ -433,8 +433,8 @@ public interface ActiveMQServer extends ServiceComponent {
Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
- Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, Integer consumersBeforeDispatch,
- Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+ Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
+ Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
@@ -446,8 +446,8 @@ public interface ActiveMQServer extends ServiceComponent {
Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
- boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch,
- long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
+ boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
+ int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
@Deprecated
Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
@@ -541,6 +541,7 @@ public interface ActiveMQServer extends ServiceComponent {
Integer maxConsumers,
Boolean purgeOnNoConsumers,
Boolean exclusive,
+ Boolean nonDestructive,
Integer consumersBeforeDispatch,
Long delayBeforeDispatch,
String user) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
index a76812c..812f2c8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/BindingQueryResult.java
@@ -41,6 +41,14 @@ public class BindingQueryResult {
private boolean defaultLastValue;
+ private SimpleString defaultLastValueKey;
+
+ private Boolean defaultNonDestructive;
+
+ private Integer defaultConsumersBeforeDispatch;
+
+ private Long defaultDelayBeforeDispatch;
+
public BindingQueryResult(final boolean exists,
final AddressInfo addressInfo,
final List<SimpleString> queueNames,
@@ -49,7 +57,11 @@ public class BindingQueryResult {
final boolean defaultPurgeOnNoConsumers,
final int defaultMaxConsumers,
final boolean defaultExclusive,
- final boolean defaultLastValue) {
+ final boolean defaultLastValue,
+ final SimpleString defaultLastValueKey,
+ final Boolean defaultNonDestructive,
+ final Integer defaultConsumersBeforeDispatch,
+ final Long defaultDelayBeforeDispatch) {
this.addressInfo = addressInfo;
this.exists = exists;
@@ -67,6 +79,14 @@ public class BindingQueryResult {
this.defaultExclusive = defaultExclusive;
this.defaultLastValue = defaultLastValue;
+
+ this.defaultLastValueKey = defaultLastValueKey;
+
+ this.defaultNonDestructive = defaultNonDestructive;
+
+ this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch;
+
+ this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch;
}
public boolean isExists() {
@@ -104,4 +124,20 @@ public class BindingQueryResult {
public boolean isDefaultLastValue() {
return defaultLastValue;
}
+
+ public SimpleString getDefaultLastValueKey() {
+ return defaultLastValueKey;
+ }
+
+ public Boolean isDefaultNonDestructive() {
+ return defaultNonDestructive;
+ }
+
+ public Integer getDefaultConsumersBeforeDispatch() {
+ return defaultConsumersBeforeDispatch;
+ }
+
+ public Long getDefaultDelayBeforeDispatch() {
+ return defaultDelayBeforeDispatch;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 48a589f..2e2fb8d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -41,6 +42,8 @@ public interface MessageReference {
long getMessageID();
+ SimpleString getLastValueProperty();
+
/**
* We define this method aggregation here because on paging we need to hold the original estimate,
* so we need to perform some extra steps on paging.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index a8f1095..f2fd8f9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -90,6 +90,12 @@ public interface Queue extends Bindable,CriticalComponent {
boolean isLastValue();
+ SimpleString getLastValueKey();
+
+ boolean isNonDestructive();
+
+ void setNonDestructive(boolean nonDestructive);
+
int getMaxConsumers();
void setMaxConsumer(int maxConsumers);
@@ -104,7 +110,7 @@ public interface Queue extends Bindable,CriticalComponent {
int getConsumerCount();
- /**
+ /**
* This will set a reference counter for every consumer present on the queue.
* The ReferenceCounter will know what to do when the counter became zeroed.
* This is used to control what to do with temporary queues, especially
@@ -227,7 +233,7 @@ public interface Queue extends Bindable,CriticalComponent {
int deleteMatchingReferences(Filter filter) throws Exception;
default int deleteMatchingReferences(int flushLImit, Filter filter) throws Exception {
- return deleteMatchingReferences(flushLImit, filter, AckReason.NORMAL);
+ return deleteMatchingReferences(flushLImit, filter, AckReason.KILLED);
}
int deleteMatchingReferences(int flushLImit, Filter filter, AckReason ackReason) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index c79114d..e682891 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -43,6 +43,8 @@ public final class QueueConfig {
private final int consumersBeforeDispatch;
private final long delayBeforeDispatch;
private final boolean configurationManaged;
+ private final SimpleString lastValueKey;
+ private final boolean nonDestructive;
public static final class Builder {
@@ -59,6 +61,8 @@ public final class QueueConfig {
private int maxConsumers;
private boolean exclusive;
private boolean lastValue;
+ private SimpleString lastValueKey;
+ private boolean nonDestructive;
private boolean purgeOnNoConsumers;
private int consumersBeforeDispatch;
private long delayBeforeDispatch;
@@ -82,6 +86,8 @@ public final class QueueConfig {
this.maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
this.exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive();
this.lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue();
+ this.lastValueKey = ActiveMQDefaultConfiguration.getDefaultLastValueKey();
+ this.nonDestructive = ActiveMQDefaultConfiguration.getDefaultNonDestructive();
this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
@@ -152,6 +158,16 @@ public final class QueueConfig {
return this;
}
+ public Builder lastValueKey(SimpleString lastValueKey) {
+ this.lastValueKey = lastValueKey;
+ return this;
+ }
+
+ public Builder nonDestructive(boolean nonDestructive) {
+ this.nonDestructive = nonDestructive;
+ return this;
+ }
+
public Builder consumersBeforeDispatch(final int consumersBeforeDispatch) {
this.consumersBeforeDispatch = consumersBeforeDispatch;
return this;
@@ -193,7 +209,7 @@ public final class QueueConfig {
} else {
pageSubscription = null;
}
- return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
+ return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
}
}
@@ -239,6 +255,8 @@ public final class QueueConfig {
final int maxConsumers,
final boolean exclusive,
final boolean lastValue,
+ final SimpleString lastValueKey,
+ final boolean nonDestructive,
final int consumersBeforeDispatch,
final long delayBeforeDispatch,
final boolean purgeOnNoConsumers,
@@ -256,6 +274,8 @@ public final class QueueConfig {
this.purgeOnNoConsumers = purgeOnNoConsumers;
this.exclusive = exclusive;
this.lastValue = lastValue;
+ this.lastValueKey = lastValueKey;
+ this.nonDestructive = nonDestructive;
this.maxConsumers = maxConsumers;
this.consumersBeforeDispatch = consumersBeforeDispatch;
this.delayBeforeDispatch = delayBeforeDispatch;
@@ -314,6 +334,14 @@ public final class QueueConfig {
return lastValue;
}
+ public SimpleString lastValueKey() {
+ return lastValueKey;
+ }
+
+ public boolean isNonDestructive() {
+ return nonDestructive;
+ }
+
public RoutingType deliveryMode() {
return routingType;
}
@@ -363,6 +391,10 @@ public final class QueueConfig {
return false;
if (lastValue != that.lastValue)
return false;
+ if (lastValueKey != null ? !lastValueKey.equals(that.lastValueKey) : that.lastValueKey != null)
+ return false;
+ if (nonDestructive != that.nonDestructive)
+ return false;
if (purgeOnNoConsumers != that.purgeOnNoConsumers)
return false;
if (consumersBeforeDispatch != that.consumersBeforeDispatch)
@@ -392,6 +424,8 @@ public final class QueueConfig {
result = 31 * result + maxConsumers;
result = 31 * result + (exclusive ? 1 : 0);
result = 31 * result + (lastValue ? 1 : 0);
+ result = 31 * result + (lastValueKey != null ? lastValueKey.hashCode() : 0);
+ result = 31 * result + (nonDestructive ? 1 : 0);
result = 31 * result + consumersBeforeDispatch;
result = 31 * result + Long.hashCode(delayBeforeDispatch);
result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
@@ -415,6 +449,8 @@ public final class QueueConfig {
+ ", maxConsumers=" + maxConsumers
+ ", exclusive=" + exclusive
+ ", lastValue=" + lastValue
+ + ", lastValueKey=" + lastValueKey
+ + ", nonDestructive=" + nonDestructive
+ ", consumersBeforeDispatch=" + consumersBeforeDispatch
+ ", delayBeforeDispatch=" + delayBeforeDispatch
+ ", purgeOnNoConsumers=" + purgeOnNoConsumers
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 6d72088..37442b2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -167,6 +167,22 @@ public interface ServerSession extends SecurityAuth {
SimpleString filterString,
boolean temporary,
boolean durable,
+ int maxConsumers,
+ boolean purgeOnNoConsumers,
+ Boolean exclusive,
+ Boolean lastValue,
+ SimpleString lastValueKey,
+ Boolean nonDestructive,
+ Integer consumersBeforeDispatch,
+ Long delayBeforeDispatch,
+ boolean autoCreated) throws Exception;
+
+ Queue createQueue(SimpleString address,
+ SimpleString name,
+ RoutingType routingType,
+ SimpleString filterString,
+ boolean temporary,
+ boolean durable,
boolean autoCreated) throws Exception;
Queue createQueue(AddressInfo addressInfo,
@@ -289,6 +305,20 @@ public interface ServerSession extends SecurityAuth {
void createSharedQueue(SimpleString address,
SimpleString name,
RoutingType routingType,
+ SimpleString filterString,
+ boolean durable,
+ Integer maxConsumers,
+ Boolean purgeOnNoConsumers,
+ Boolean exclusive,
+ Boolean lastValue,
+ SimpleString lastValueKey,
+ Boolean nonDestructive,
+ Integer consumersBeforeDispatch,
+ Long delayBeforeDispatch) throws Exception;
+
+ void createSharedQueue(SimpleString address,
+ SimpleString name,
+ RoutingType routingType,
boolean durable,
SimpleString filterString) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/547b2aa5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
index 06b3d85..164f141 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AckReason.java
@@ -18,5 +18,5 @@
package org.apache.activemq.artemis.core.server.impl;
public enum AckReason {
- KILLED, EXPIRED, NORMAL
+ KILLED, EXPIRED, NORMAL, REPLACED
}
\ No newline at end of file