You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/04/14 14:48:13 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1803 - Pass ServerConsumer
to messageExpired and messageAcknowledged plugin callback methods
Repository: activemq-artemis
Updated Branches:
refs/heads/master 5535af1a8 -> c4763bc08
ARTEMIS-1803 - Pass ServerConsumer to messageExpired and
messageAcknowledged plugin callback methods
Knowing the consumer that expired or acked a message (if available) is
useful and right now a message reference only contains a consumer id
which by itself is not unique so the actual consumer needs to be passed
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bddfa489
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bddfa489
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bddfa489
Branch: refs/heads/master
Commit: bddfa4892b1c0a4085069f7d015f0ac946f1d416
Parents: 5535af1
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Apr 12 12:44:48 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Sat Apr 14 10:34:09 2018 -0400
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQConsumer.java | 10 ++--
.../core/paging/cursor/PagedReferenceImpl.java | 14 +++--
.../artemis/core/server/MessageReference.java | 4 +-
.../activemq/artemis/core/server/Queue.java | 8 ++-
.../core/server/impl/LastValueQueue.java | 10 +++-
.../core/server/impl/MessageReferenceImpl.java | 14 +++--
.../artemis/core/server/impl/QueueImpl.java | 54 ++++++++++++--------
.../core/server/impl/ServerConsumerImpl.java | 8 +--
.../core/server/impl/ServerSessionImpl.java | 5 +-
.../server/plugin/ActiveMQServerPlugin.java | 35 +++++++++++++
.../impl/LoggingActiveMQServerPlugin.java | 5 +-
.../impl/LoggingActiveMQServerPluginLogger.java | 6 +--
.../impl/ScheduledDeliveryHandlerTest.java | 15 +++++-
.../integration/plugin/CorePluginTest.java | 45 ++++++++++++++--
.../plugin/MethodCalledVerifier.java | 4 +-
.../unit/core/postoffice/impl/FakeQueue.java | 17 +++++-
16 files changed, 193 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index b478c7d..e0b02ae 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -54,15 +54,15 @@ import org.apache.activemq.command.RemoveInfo;
public class AMQConsumer {
private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
- private AMQSession session;
+ private final AMQSession session;
private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
private final boolean hasNotificationDestination;
- private ConsumerInfo info;
+ private final ConsumerInfo info;
private final ScheduledExecutorService scheduledPool;
private ServerConsumer serverConsumer;
private int prefetchSize;
- private AtomicInteger currentWindow;
+ private final AtomicInteger currentWindow;
private long messagePullSequence = 0;
private MessagePullHandler messagePullHandler;
//internal means we don't expose
@@ -284,7 +284,7 @@ public class AMQConsumer {
if (ack.isIndividualAck() || ack.isStandardAck()) {
for (MessageReference ref : ackList) {
- ref.acknowledge(transaction);
+ ref.acknowledge(transaction, serverConsumer);
}
} else if (ack.isPoisonAck()) {
for (MessageReference ref : ackList) {
@@ -302,7 +302,7 @@ public class AMQConsumer {
}
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
- ref.getQueue().expire(ref);
+ ref.getQueue().expire(ref, serverConsumer);
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 9a37bd8..081f7da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@@ -234,15 +235,20 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
@Override
public void acknowledge(Transaction tx) throws Exception {
- acknowledge(tx, AckReason.NORMAL);
+ acknowledge(tx, null);
}
@Override
- public void acknowledge(Transaction tx, AckReason reason) throws Exception {
+ public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
+ acknowledge(tx, AckReason.NORMAL, consumer);
+ }
+
+ @Override
+ public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
if (tx == null) {
- getQueue().acknowledge(this, reason);
+ getQueue().acknowledge(this, reason, consumer);
} else {
- getQueue().acknowledge(tx, this, reason);
+ getQueue().acknowledge(tx, this, reason, consumer);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 0db84c5..48a589f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -89,7 +89,9 @@ public interface MessageReference {
void acknowledge(Transaction tx) throws Exception;
- void acknowledge(Transaction tx, AckReason reason) throws Exception;
+ void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception;
+
+ void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception;
void emptyConsumerID();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index c355dbf..a235352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -107,11 +107,13 @@ public interface Queue extends Bindable,CriticalComponent {
void acknowledge(MessageReference ref) throws Exception;
- void acknowledge(MessageReference ref, AckReason reason) throws Exception;
+ void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception;
+
+ void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
void acknowledge(Transaction tx, MessageReference ref) throws Exception;
- void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception;
+ void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
void reacknowledge(Transaction tx, MessageReference ref) throws Exception;
@@ -221,6 +223,8 @@ public interface Queue extends Bindable,CriticalComponent {
void expire(MessageReference ref) throws Exception;
+ void expire(MessageReference ref, ServerConsumer consumer) throws Exception;
+
boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index e3097d1..fc96591 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -276,8 +277,13 @@ public class LastValueQueue extends QueueImpl {
}
@Override
- public void acknowledge(Transaction tx, AckReason reason) throws Exception {
- ref.acknowledge(tx, reason);
+ public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
+ ref.acknowledge(tx, consumer);
+ }
+
+ @Override
+ public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
+ ref.acknowledge(tx, reason, consumer);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 4d077ae..96975e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@@ -190,15 +191,20 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
@Override
public void acknowledge(Transaction tx) throws Exception {
- acknowledge(tx, AckReason.NORMAL);
+ acknowledge(tx, null);
}
@Override
- public void acknowledge(Transaction tx, AckReason reason) throws Exception {
+ public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
+ acknowledge(tx, AckReason.NORMAL, consumer);
+ }
+
+ @Override
+ public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
if (tx == null) {
- getQueue().acknowledge(this, reason);
+ getQueue().acknowledge(this, reason, consumer);
} else {
- getQueue().acknowledge(tx, this, reason);
+ getQueue().acknowledge(tx, this, reason, consumer);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 5937491..957c2bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -1223,11 +1224,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void acknowledge(final MessageReference ref) throws Exception {
- acknowledge(ref, AckReason.NORMAL);
+ acknowledge(ref, null);
}
@Override
- public void acknowledge(final MessageReference ref, AckReason reason) throws Exception {
+ public void acknowledge(final MessageReference ref, final ServerConsumer consumer) throws Exception {
+ acknowledge(ref, AckReason.NORMAL, consumer);
+ }
+
+ @Override
+ public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (ref.isPaged()) {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref);
@@ -1251,17 +1257,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (server != null && server.hasBrokerPlugins()) {
- server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason));
+ server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}
}
@Override
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
- acknowledge(tx, ref, AckReason.NORMAL);
+ acknowledge(tx, ref, AckReason.NORMAL, null);
}
@Override
- public void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception {
+ public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
@@ -1289,7 +1295,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (server != null && server.hasBrokerPlugins()) {
- server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason));
+ server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
}
}
@@ -1358,6 +1364,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void expire(final MessageReference ref) throws Exception {
+ expire(ref, null);
+ }
+
+ @Override
+ public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref);
if (messageExpiryAddress == null) {
messageExpiryAddress = expiryAddressFromAddressSettings(ref);
@@ -1367,17 +1378,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName());
}
- move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED);
+ move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer);
} else {
if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
}
- acknowledge(ref, AckReason.EXPIRED);
+ acknowledge(ref, AckReason.EXPIRED, consumer);
}
if (server != null && server.hasBrokerPlugins()) {
final SimpleString expiryAddress = messageExpiryAddress;
- server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress));
+ server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
}
}
@@ -1490,7 +1501,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
incDelivering(ref);
- acknowledge(tx, ref, ackReason);
+ acknowledge(tx, ref, ackReason, null);
if (fromMessageReferences) {
refRemoved(ref);
}
@@ -1878,7 +1889,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
refRemoved(ref);
incDelivering(ref);
try {
- move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
+ move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null);
} catch (Exception e) {
decDelivering(ref);
throw e;
@@ -1922,7 +1933,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
if (!ignored) {
- move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
+ move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null);
refRemoved(ref);
//move(toAddress, tx, ref, false, rejectDuplicates);
}
@@ -2586,7 +2597,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
postOffice.route(copyMessage, tx, false, rejectDuplicate);
if (expiry) {
- acknowledge(tx, ref, AckReason.EXPIRED);
+ acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
acknowledge(tx, ref);
}
@@ -2749,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
- acknowledge(tx, ref, AckReason.EXPIRED);
+ acknowledge(tx, ref, AckReason.EXPIRED, null);
} else {
move(expiryAddress, tx, ref, true, true);
}
@@ -2760,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
}
- acknowledge(tx, ref, AckReason.EXPIRED);
+ acknowledge(tx, ref, AckReason.EXPIRED, null);
}
}
@@ -2777,15 +2788,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
- ref.acknowledge(tx, AckReason.KILLED);
+ ref.acknowledge(tx, AckReason.KILLED, null);
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
- move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED);
+ move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
- ref.acknowledge(tx, AckReason.KILLED);
+ ref.acknowledge(tx, AckReason.KILLED, null);
}
}
@@ -2794,7 +2805,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
final Binding binding,
final MessageReference ref,
final boolean rejectDuplicate,
- final AckReason reason) throws Exception {
+ final AckReason reason,
+ final ServerConsumer consumer) throws Exception {
Transaction tx;
if (originalTX != null) {
@@ -2810,7 +2822,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
- acknowledge(tx, ref, reason);
+ acknowledge(tx, ref, reason, consumer);
if (originalTX == null) {
tx.commit();
@@ -3046,7 +3058,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
Transaction transaction = new TransactionImpl(storageManager);
for (MessageReference reference : refs) {
incDelivering(reference); // post ack will decrement this, so need to inc
- acknowledge(transaction, reference, AckReason.KILLED);
+ acknowledge(transaction, reference, AckReason.KILLED, null);
}
transaction.commit();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 8ef0fa1..c81105a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -423,7 +423,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
// With pre-ack, we ack *before* sending to the client
- ref.getQueue().acknowledge(ref);
+ ref.getQueue().acknowledge(ref, this);
acks++;
}
@@ -633,7 +633,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
if (!deliveringRefs.isEmpty()) {
for (MessageReference ref : deliveringRefs) {
if (performACK) {
- ref.acknowledge(tx);
+ ref.acknowledge(tx, this);
performACK = false;
} else {
@@ -863,7 +863,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
throw ils;
}
- ref.acknowledge(tx);
+ ref.acknowledge(tx, this);
acks++;
}
@@ -926,7 +926,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
throw ils;
}
- ref.acknowledge(tx);
+ ref.acknowledge(tx, this);
acks++;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 55125bd..f92b45a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -926,10 +926,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public void expire(final long consumerID, final long messageID) throws Exception {
- MessageReference ref = locateConsumer(consumerID).removeReferenceByID(messageID);
+ final ServerConsumer consumer = locateConsumer(consumerID);
+ MessageReference ref = consumer.removeReferenceByID(messageID);
if (ref != null) {
- ref.getQueue().expire(ref);
+ ref.getQueue().expire(ref, consumer);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index ed00ab0..db8a922 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -548,23 +548,58 @@ public interface ActiveMQServerPlugin {
* @param message The expired message
* @param messageExpiryAddress The message expiry address if exists
* @throws ActiveMQException
+ *
+ * @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)}
*/
+ @Deprecated
default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
}
/**
+ * A message has been expired
+ *
+ * @param message The expired message
+ * @param messageExpiryAddress The message expiry address if exists
+ * @param consumer the Consumer that acknowledged the message - this field is optional
+ * and can be null
+ * @throws ActiveMQException
+ */
+ default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException {
+
+ }
+
+ /**
* A message has been acknowledged
*
* @param ref The acked message
* @param reason The ack reason
* @throws ActiveMQException
+ *
+ * @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)}
*/
+ @Deprecated
default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
}
/**
+ * A message has been acknowledged
+ *
+ * @param ref The acked message
+ * @param reason The ack reason
+ * @param consumer the Consumer that acknowledged the message - this field is optional
+ * and can be null
+ * @throws ActiveMQException
+ *
+ */
+ default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
+ //by default call the old method for backwards compatibility
+ this.messageAcknowledged(ref, reason);
+ }
+
+
+ /**
* Before a bridge is deployed
*
* @param config The bridge configuration
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
index 3e66bf1..140c3e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
@@ -617,7 +617,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
* @throws ActiveMQException
*/
@Override
- public void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
+ public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
if (logAll || logDeliveringEvents) {
//details - debug logging
@@ -629,7 +629,8 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
// info level logging
LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
- (ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null),
+ (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null),
+ (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())),
(queue == null ? UNAVAILABLE : queue.getName().toString()),
reason);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
index 90c5dec..f519dd0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
@@ -129,9 +129,9 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
void messageExpired(MessageReference message, SimpleString messageExpiryAddress);
@LogMessage(level = Logger.Level.INFO)
- @Message(id = 841014, value = "acknowledged message ID: {0}, with messageRef consumerID: {1}, messageRef QueueName: {2}," +
- " with ackReason: {3}", format = Message.Format.MESSAGE_FORMAT)
- void messageAcknowledged(String messageID, String consumerID, String queueName, AckReason reason);
+ @Message(id = 841014, value = "acknowledged message ID: {0}, messageRef sessionID: {1}, with messageRef consumerID: {2}, messageRef QueueName: {3}," +
+ " with ackReason: {4}", format = Message.Format.MESSAGE_FORMAT)
+ void messageAcknowledged(String messageID, String sessionID, String consumerID, String queueName, AckReason reason);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 841015, value = "deployed bridge: {0}", format = Message.Format.MESSAGE_FORMAT)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2cae2c7..fdce2d0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -959,7 +960,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
+ public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {
+
+ }
+
+ @Override
+ public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@@ -969,7 +975,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
+ public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
}
@@ -1169,6 +1175,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
+
+ }
+
+ @Override
public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index 6545f70..f83b3ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -76,6 +77,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
@@ -114,6 +116,13 @@ public class CorePluginTest extends JMSTestBase {
@Test
public void testSendReceive() throws Exception {
+ final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
+ assertEquals(AckReason.NORMAL, reason);
+ assertNotNull(consumer);
+ });
+
+ server.registerBrokerPlugin(ackVerifier);
+
conn = cf.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -139,6 +148,8 @@ public class CorePluginTest extends JMSTestBase {
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get());
assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get());
assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1);
+
+ assertFalse(ackVerifier.getErrorMsg(), ackVerifier.hasError());
}
@Test
@@ -199,7 +210,8 @@ public class CorePluginTest extends JMSTestBase {
@Test
public void testMessageExpireServer() throws Exception {
- server.registerBrokerPlugin(new ExpiredPluginVerifier());
+ final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
+ server.registerBrokerPlugin(expiredVerifier);
conn = cf.createConnection();
conn.setClientID("test");
@@ -227,11 +239,13 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
+ assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
}
@Test
public void testMessageExpireClient() throws Exception {
- server.registerBrokerPlugin(new ExpiredPluginVerifier());
+ final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
+ server.registerBrokerPlugin(expiredVerifier);
conn = cf.createConnection();
conn.start();
@@ -260,6 +274,7 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
AFTER_CLOSE_SESSION);
+ assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
}
@Test
@@ -324,11 +339,31 @@ public class CorePluginTest extends JMSTestBase {
verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS);
}
- private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
+ private class AckPluginVerifier implements ActiveMQServerPlugin {
+
+ private BiConsumer<ServerConsumer, AckReason> assertion;
+ private Throwable error;
+
+ AckPluginVerifier(BiConsumer<ServerConsumer, AckReason> assertion) {
+ this.assertion = assertion;
+ }
@Override
- public void messageAcknowledged(MessageReference ref, AckReason reason) {
- assertEquals(AckReason.EXPIRED, reason);
+ public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
+ try {
+ assertion.accept(consumer, reason);
+ } catch (Throwable e) {
+ error = e;
+ throw e;
+ }
+ }
+
+ private boolean hasError() {
+ return error != null;
+ }
+
+ private String getErrorMsg() {
+ return hasError() ? error.getMessage() : "";
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index 8e343ca..8977ba5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -272,13 +272,13 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
}
@Override
- public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+ public void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) {
Preconditions.checkNotNull(message);
methodCalled(MESSAGE_EXPIRED);
}
@Override
- public void messageAcknowledged(MessageReference ref, AckReason reason) {
+ public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
Preconditions.checkNotNull(ref);
Preconditions.checkNotNull(reason);
methodCalled(MESSAGE_ACKED);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 25300d3c..bc628f5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@@ -213,7 +214,13 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
+ public void acknowledge(final MessageReference ref, ServerConsumer consumer) throws Exception {
+ // no-op
+
+ }
+
+ @Override
+ public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
// no-op
}
@@ -225,7 +232,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
+ public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
// no-op
}
@@ -311,6 +318,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
+ // no-op
+
+ }
+
+ @Override
public boolean expireReference(final long messageID) throws Exception {
// no-op
return false;
[2/2] activemq-artemis git commit: This closes #2012
Posted by ta...@apache.org.
This closes #2012
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4763bc0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4763bc0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4763bc0
Branch: refs/heads/master
Commit: c4763bc088c0d83c323b42ce5be0f542779368dd
Parents: 5535af1 bddfa48
Author: Timothy Bish <ta...@gmail.com>
Authored: Sat Apr 14 10:47:47 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Sat Apr 14 10:47:47 2018 -0400
----------------------------------------------------------------------
.../core/protocol/openwire/amq/AMQConsumer.java | 10 ++--
.../core/paging/cursor/PagedReferenceImpl.java | 14 +++--
.../artemis/core/server/MessageReference.java | 4 +-
.../activemq/artemis/core/server/Queue.java | 8 ++-
.../core/server/impl/LastValueQueue.java | 10 +++-
.../core/server/impl/MessageReferenceImpl.java | 14 +++--
.../artemis/core/server/impl/QueueImpl.java | 54 ++++++++++++--------
.../core/server/impl/ServerConsumerImpl.java | 8 +--
.../core/server/impl/ServerSessionImpl.java | 5 +-
.../server/plugin/ActiveMQServerPlugin.java | 35 +++++++++++++
.../impl/LoggingActiveMQServerPlugin.java | 5 +-
.../impl/LoggingActiveMQServerPluginLogger.java | 6 +--
.../impl/ScheduledDeliveryHandlerTest.java | 15 +++++-
.../integration/plugin/CorePluginTest.java | 45 ++++++++++++++--
.../plugin/MethodCalledVerifier.java | 4 +-
.../unit/core/postoffice/impl/FakeQueue.java | 17 +++++-
16 files changed, 193 insertions(+), 61 deletions(-)
----------------------------------------------------------------------