You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2018/04/14 14:48:13 UTC

[1/2] activemq-artemis git commit: ARTEMIS-1803 - Pass ServerConsumer to messageExpired and messageAcknowledged plugin callback methods

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 5535af1a8 -> c4763bc08


ARTEMIS-1803 - Pass ServerConsumer to messageExpired and
messageAcknowledged plugin callback methods

Knowing the consumer that expired or acked a message (if available) is
useful and right now a message reference only contains a consumer id
which by itself is not unique so the actual consumer needs to be passed


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bddfa489
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bddfa489
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bddfa489

Branch: refs/heads/master
Commit: bddfa4892b1c0a4085069f7d015f0ac946f1d416
Parents: 5535af1
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Apr 12 12:44:48 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Sat Apr 14 10:34:09 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java | 10 ++--
 .../core/paging/cursor/PagedReferenceImpl.java  | 14 +++--
 .../artemis/core/server/MessageReference.java   |  4 +-
 .../activemq/artemis/core/server/Queue.java     |  8 ++-
 .../core/server/impl/LastValueQueue.java        | 10 +++-
 .../core/server/impl/MessageReferenceImpl.java  | 14 +++--
 .../artemis/core/server/impl/QueueImpl.java     | 54 ++++++++++++--------
 .../core/server/impl/ServerConsumerImpl.java    |  8 +--
 .../core/server/impl/ServerSessionImpl.java     |  5 +-
 .../server/plugin/ActiveMQServerPlugin.java     | 35 +++++++++++++
 .../impl/LoggingActiveMQServerPlugin.java       |  5 +-
 .../impl/LoggingActiveMQServerPluginLogger.java |  6 +--
 .../impl/ScheduledDeliveryHandlerTest.java      | 15 +++++-
 .../integration/plugin/CorePluginTest.java      | 45 ++++++++++++++--
 .../plugin/MethodCalledVerifier.java            |  4 +-
 .../unit/core/postoffice/impl/FakeQueue.java    | 17 +++++-
 16 files changed, 193 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index b478c7d..e0b02ae 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -54,15 +54,15 @@ import org.apache.activemq.command.RemoveInfo;
 
 public class AMQConsumer {
    private static final String AMQ_NOTIFICATIONS_DESTINATION = "activemq.notifications";
-   private AMQSession session;
+   private final AMQSession session;
    private final org.apache.activemq.command.ActiveMQDestination openwireDestination;
    private final boolean hasNotificationDestination;
-   private ConsumerInfo info;
+   private final ConsumerInfo info;
    private final ScheduledExecutorService scheduledPool;
    private ServerConsumer serverConsumer;
 
    private int prefetchSize;
-   private AtomicInteger currentWindow;
+   private final AtomicInteger currentWindow;
    private long messagePullSequence = 0;
    private MessagePullHandler messagePullHandler;
    //internal means we don't expose
@@ -284,7 +284,7 @@ public class AMQConsumer {
 
          if (ack.isIndividualAck() || ack.isStandardAck()) {
             for (MessageReference ref : ackList) {
-               ref.acknowledge(transaction);
+               ref.acknowledge(transaction, serverConsumer);
             }
          } else if (ack.isPoisonAck()) {
             for (MessageReference ref : ackList) {
@@ -302,7 +302,7 @@ public class AMQConsumer {
       }
       if (ack.isExpiredAck()) {
          for (MessageReference ref : ackList) {
-            ref.getQueue().expire(ref);
+            ref.getQueue().expire(ref, serverConsumer);
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 9a37bd8..081f7da 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.PagedMessage;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@@ -234,15 +235,20 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
 
    @Override
    public void acknowledge(Transaction tx) throws Exception {
-      acknowledge(tx, AckReason.NORMAL);
+      acknowledge(tx, null);
    }
 
    @Override
-   public void acknowledge(Transaction tx, AckReason reason) throws Exception {
+   public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
+      acknowledge(tx, AckReason.NORMAL, consumer);
+   }
+
+   @Override
+   public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
       if (tx == null) {
-         getQueue().acknowledge(this, reason);
+         getQueue().acknowledge(this, reason, consumer);
       } else {
-         getQueue().acknowledge(tx, this, reason);
+         getQueue().acknowledge(tx, this, reason, consumer);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 0db84c5..48a589f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -89,7 +89,9 @@ public interface MessageReference {
 
    void acknowledge(Transaction tx) throws Exception;
 
-   void acknowledge(Transaction tx, AckReason reason) throws Exception;
+   void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception;
+
+   void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception;
 
    void emptyConsumerID();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index c355dbf..a235352 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -107,11 +107,13 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void acknowledge(MessageReference ref) throws Exception;
 
-   void acknowledge(MessageReference ref, AckReason reason) throws Exception;
+   void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception;
+
+   void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
 
    void acknowledge(Transaction tx, MessageReference ref) throws Exception;
 
-   void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception;
+   void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception;
 
    void reacknowledge(Transaction tx, MessageReference ref) throws Exception;
 
@@ -221,6 +223,8 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void expire(MessageReference ref) throws Exception;
 
+   void expire(MessageReference ref, ServerConsumer consumer) throws Exception;
+
    boolean sendMessageToDeadLetterAddress(long messageID) throws Exception;
 
    int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index e3097d1..fc96591 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.Transaction;
@@ -276,8 +277,13 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
-      public void acknowledge(Transaction tx, AckReason reason) throws Exception {
-         ref.acknowledge(tx, reason);
+      public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
+         ref.acknowledge(tx, consumer);
+      }
+
+      @Override
+      public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
+         ref.acknowledge(tx, reason, consumer);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 4d077ae..96975e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 
@@ -190,15 +191,20 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    @Override
    public void acknowledge(Transaction tx) throws Exception {
-      acknowledge(tx, AckReason.NORMAL);
+      acknowledge(tx, null);
    }
 
    @Override
-   public void acknowledge(Transaction tx, AckReason reason) throws Exception {
+   public void acknowledge(Transaction tx, ServerConsumer consumer) throws Exception {
+      acknowledge(tx, AckReason.NORMAL, consumer);
+   }
+
+   @Override
+   public void acknowledge(Transaction tx, AckReason reason, ServerConsumer consumer) throws Exception {
       if (tx == null) {
-         getQueue().acknowledge(this, reason);
+         getQueue().acknowledge(this, reason, consumer);
       } else {
-         getQueue().acknowledge(tx, this, reason);
+         getQueue().acknowledge(tx, this, reason, consumer);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 5937491..957c2bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -72,6 +72,7 @@ import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueFactory;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -1223,11 +1224,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void acknowledge(final MessageReference ref) throws Exception {
-      acknowledge(ref, AckReason.NORMAL);
+      acknowledge(ref, null);
    }
 
    @Override
-   public void acknowledge(final MessageReference ref, AckReason reason) throws Exception {
+   public void acknowledge(final MessageReference ref, final ServerConsumer consumer) throws Exception {
+      acknowledge(ref, AckReason.NORMAL, consumer);
+   }
+
+   @Override
+   public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
       if (ref.isPaged()) {
          pageSubscription.ack((PagedReference) ref);
          postAcknowledge(ref);
@@ -1251,17 +1257,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (server != null && server.hasBrokerPlugins()) {
-         server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason));
+         server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
       }
    }
 
    @Override
    public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
-      acknowledge(tx, ref, AckReason.NORMAL);
+      acknowledge(tx, ref, AckReason.NORMAL, null);
    }
 
    @Override
-   public void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception {
+   public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
       if (ref.isPaged()) {
          pageSubscription.ackTx(tx, (PagedReference) ref);
 
@@ -1289,7 +1295,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
 
       if (server != null && server.hasBrokerPlugins()) {
-         server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason));
+         server.callBrokerPlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
       }
    }
 
@@ -1358,6 +1364,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void expire(final MessageReference ref) throws Exception {
+      expire(ref, null);
+   }
+
+   @Override
+   public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
       SimpleString messageExpiryAddress = expiryAddressFromMessageAddress(ref);
       if (messageExpiryAddress == null) {
          messageExpiryAddress = expiryAddressFromAddressSettings(ref);
@@ -1367,17 +1378,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          if (logger.isTraceEnabled()) {
             logger.trace("moving expired reference " + ref + " to address = " + messageExpiryAddress + " from queue=" + this.getName());
          }
-         move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED);
+         move(null, messageExpiryAddress, null, ref, false, AckReason.EXPIRED, consumer);
       } else {
          if (logger.isTraceEnabled()) {
             logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
          }
-         acknowledge(ref, AckReason.EXPIRED);
+         acknowledge(ref, AckReason.EXPIRED, consumer);
       }
 
       if (server != null && server.hasBrokerPlugins()) {
          final SimpleString expiryAddress = messageExpiryAddress;
-         server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress));
+         server.callBrokerPlugins(plugin -> plugin.messageExpired(ref, expiryAddress, consumer));
       }
    }
 
@@ -1490,7 +1501,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          @Override
          public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences) throws Exception {
             incDelivering(ref);
-            acknowledge(tx, ref, ackReason);
+            acknowledge(tx, ref, ackReason, null);
             if (fromMessageReferences) {
                refRemoved(ref);
             }
@@ -1878,7 +1889,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                refRemoved(ref);
                incDelivering(ref);
                try {
-                  move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL);
+                  move(null, toAddress, binding, ref, rejectDuplicate, AckReason.NORMAL, null);
                } catch (Exception e) {
                   decDelivering(ref);
                   throw e;
@@ -1922,7 +1933,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             }
 
             if (!ignored) {
-               move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL);
+               move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL, null);
                refRemoved(ref);
                //move(toAddress, tx, ref, false, rejectDuplicates);
             }
@@ -2586,7 +2597,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       postOffice.route(copyMessage, tx, false, rejectDuplicate);
 
       if (expiry) {
-         acknowledge(tx, ref, AckReason.EXPIRED);
+         acknowledge(tx, ref, AckReason.EXPIRED, null);
       } else {
          acknowledge(tx, ref);
       }
@@ -2749,7 +2760,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          if (bindingList.getBindings().isEmpty()) {
             ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
-            acknowledge(tx, ref, AckReason.EXPIRED);
+            acknowledge(tx, ref, AckReason.EXPIRED, null);
          } else {
             move(expiryAddress, tx, ref, true, true);
          }
@@ -2760,7 +2771,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
          }
 
-         acknowledge(tx, ref, AckReason.EXPIRED);
+         acknowledge(tx, ref, AckReason.EXPIRED, null);
       }
    }
 
@@ -2777,15 +2788,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          if (bindingList.getBindings().isEmpty()) {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
-            ref.acknowledge(tx, AckReason.KILLED);
+            ref.acknowledge(tx, AckReason.KILLED, null);
          } else {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
-            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED);
+            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
          }
       } else {
          ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
 
-         ref.acknowledge(tx, AckReason.KILLED);
+         ref.acknowledge(tx, AckReason.KILLED, null);
       }
    }
 
@@ -2794,7 +2805,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final Binding binding,
                      final MessageReference ref,
                      final boolean rejectDuplicate,
-                     final AckReason reason) throws Exception {
+                     final AckReason reason,
+                     final ServerConsumer consumer) throws Exception {
       Transaction tx;
 
       if (originalTX != null) {
@@ -2810,7 +2822,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);
 
-      acknowledge(tx, ref, reason);
+      acknowledge(tx, ref, reason, consumer);
 
       if (originalTX == null) {
          tx.commit();
@@ -3046,7 +3058,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          Transaction transaction = new TransactionImpl(storageManager);
          for (MessageReference reference : refs) {
             incDelivering(reference); // post ack will decrement this, so need to inc
-            acknowledge(transaction, reference, AckReason.KILLED);
+            acknowledge(transaction, reference, AckReason.KILLED, null);
          }
          transaction.commit();
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 8ef0fa1..c81105a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -423,7 +423,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                }
 
                // With pre-ack, we ack *before* sending to the client
-               ref.getQueue().acknowledge(ref);
+               ref.getQueue().acknowledge(ref, this);
                acks++;
             }
 
@@ -633,7 +633,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
          if (!deliveringRefs.isEmpty()) {
             for (MessageReference ref : deliveringRefs) {
                if (performACK) {
-                  ref.acknowledge(tx);
+                  ref.acknowledge(tx, this);
 
                   performACK = false;
                } else {
@@ -863,7 +863,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
                throw ils;
             }
 
-            ref.acknowledge(tx);
+            ref.acknowledge(tx, this);
 
             acks++;
          }
@@ -926,7 +926,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
             throw ils;
          }
 
-         ref.acknowledge(tx);
+         ref.acknowledge(tx, this);
 
          acks++;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 55125bd..f92b45a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -926,10 +926,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public void expire(final long consumerID, final long messageID) throws Exception {
-      MessageReference ref = locateConsumer(consumerID).removeReferenceByID(messageID);
+      final ServerConsumer consumer = locateConsumer(consumerID);
+      MessageReference ref = consumer.removeReferenceByID(messageID);
 
       if (ref != null) {
-         ref.getQueue().expire(ref);
+         ref.getQueue().expire(ref, consumer);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
index ed00ab0..db8a922 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerPlugin.java
@@ -548,23 +548,58 @@ public interface ActiveMQServerPlugin {
     * @param message The expired message
     * @param messageExpiryAddress The message expiry address if exists
     * @throws ActiveMQException
+    *
+    * @deprecated use {@link #messageExpired(MessageReference, SimpleString, ServerConsumer)}
     */
+   @Deprecated
    default void messageExpired(MessageReference message, SimpleString messageExpiryAddress) throws ActiveMQException {
 
    }
 
    /**
+    * A message has been expired
+    *
+    * @param message The expired message
+    * @param messageExpiryAddress The message expiry address if exists
+    * @param consumer the Consumer that acknowledged the message - this field is optional
+    * and can be null
+    * @throws ActiveMQException
+    */
+   default void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) throws ActiveMQException {
+
+   }
+
+   /**
     * A message has been acknowledged
     *
     * @param ref The acked message
     * @param reason The ack reason
     * @throws ActiveMQException
+    *
+    * @deprecated use {@link #messageAcknowledged(MessageReference, AckReason, ServerConsumer)}
     */
+   @Deprecated
    default void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
 
    }
 
    /**
+    * A message has been acknowledged
+    *
+    * @param ref The acked message
+    * @param reason The ack reason
+    * @param consumer the Consumer that acknowledged the message - this field is optional
+    * and can be null
+    * @throws ActiveMQException
+    *
+    */
+   default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
+      //by default call the old method for backwards compatibility
+      this.messageAcknowledged(ref, reason);
+   }
+
+
+   /**
     * Before a bridge is deployed
     *
     * @param config The bridge configuration

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
index 3e66bf1..140c3e1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
@@ -617,7 +617,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
     * @throws ActiveMQException
     */
    @Override
-   public void messageAcknowledged(MessageReference ref, AckReason reason) throws ActiveMQException {
+   public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
       if (logAll || logDeliveringEvents) {
 
          //details - debug logging
@@ -629,7 +629,8 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
 
             // info level logging
             LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
-                                                                         (ref == null ? UNAVAILABLE : ref.hasConsumerId() ? Long.toString(ref.getConsumerId()) : null),
+                                                                         (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null),
+                                                                         (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())),
                                                                          (queue == null ? UNAVAILABLE : queue.getName().toString()),
                                                                          reason);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
index 90c5dec..f519dd0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
@@ -129,9 +129,9 @@ public interface LoggingActiveMQServerPluginLogger extends BasicLogger {
    void messageExpired(MessageReference message, SimpleString messageExpiryAddress);
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 841014, value = "acknowledged message ID: {0}, with messageRef consumerID: {1}, messageRef QueueName: {2}," +
-      "  with ackReason: {3}", format = Message.Format.MESSAGE_FORMAT)
-   void messageAcknowledged(String messageID, String consumerID, String queueName, AckReason reason);
+   @Message(id = 841014, value = "acknowledged message ID: {0}, messageRef sessionID: {1}, with messageRef consumerID: {2}, messageRef QueueName: {3}," +
+      "  with ackReason: {4}", format = Message.Format.MESSAGE_FORMAT)
+   void messageAcknowledged(String messageID, String sessionID, String consumerID, String queueName, AckReason reason);
 
    @LogMessage(level = Logger.Level.INFO)
    @Message(id = 841015, value = "deployed bridge: {0}", format = Message.Format.MESSAGE_FORMAT)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2cae2c7..fdce2d0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -49,6 +49,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
@@ -959,7 +960,12 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
+      public void acknowledge(MessageReference ref, ServerConsumer consumer) throws Exception {
+
+      }
+
+      @Override
+      public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
 
       }
 
@@ -969,7 +975,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
+      public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
 
       }
 
@@ -1169,6 +1175,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void expire(MessageReference ref, ServerConsumer consumer) throws Exception {
+
+      }
+
+      @Override
       public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
index 6545f70..f83b3ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/CorePluginTest.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
 
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -76,6 +77,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
 import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
@@ -114,6 +116,13 @@ public class CorePluginTest extends JMSTestBase {
 
    @Test
    public void testSendReceive() throws Exception {
+      final AckPluginVerifier ackVerifier = new AckPluginVerifier((consumer, reason) -> {
+         assertEquals(AckReason.NORMAL, reason);
+         assertNotNull(consumer);
+      });
+
+      server.registerBrokerPlugin(ackVerifier);
+
       conn = cf.createConnection();
       conn.start();
       Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -139,6 +148,8 @@ public class CorePluginTest extends JMSTestBase {
       assertEquals("configurationVerifier is invoked", 1, configurationVerifier.afterSendCounter.get());
       assertEquals("configurationVerifier is invoked", 1, configurationVerifier.successRoutedCounter.get());
       assertEquals("configurationVerifier config set", "val_1", configurationVerifier.value1);
+
+      assertFalse(ackVerifier.getErrorMsg(), ackVerifier.hasError());
    }
 
    @Test
@@ -199,7 +210,8 @@ public class CorePluginTest extends JMSTestBase {
 
    @Test
    public void testMessageExpireServer() throws Exception {
-      server.registerBrokerPlugin(new ExpiredPluginVerifier());
+      final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
+      server.registerBrokerPlugin(expiredVerifier);
 
       conn = cf.createConnection();
       conn.setClientID("test");
@@ -227,11 +239,13 @@ public class CorePluginTest extends JMSTestBase {
       verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
             AFTER_CLOSE_SESSION);
 
+      assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
    }
 
    @Test
    public void testMessageExpireClient() throws Exception {
-      server.registerBrokerPlugin(new ExpiredPluginVerifier());
+      final AckPluginVerifier expiredVerifier = new AckPluginVerifier((ref, reason) -> assertEquals(AckReason.EXPIRED, reason));
+      server.registerBrokerPlugin(expiredVerifier);
 
       conn = cf.createConnection();
       conn.start();
@@ -260,6 +274,7 @@ public class CorePluginTest extends JMSTestBase {
       verifier.validatePluginMethodsEquals(2, BEFORE_CREATE_SESSION, AFTER_CREATE_SESSION, BEFORE_CLOSE_SESSION,
             AFTER_CLOSE_SESSION);
 
+      assertFalse(expiredVerifier.getErrorMsg(), expiredVerifier.hasError());
    }
 
    @Test
@@ -324,11 +339,31 @@ public class CorePluginTest extends JMSTestBase {
       verifier.validatePluginMethodsEquals(1, BEFORE_UPDATE_ADDRESS, AFTER_UPDATE_ADDRESS);
    }
 
-   private class ExpiredPluginVerifier implements ActiveMQServerPlugin {
+   private class AckPluginVerifier implements ActiveMQServerPlugin {
+
+      private BiConsumer<ServerConsumer, AckReason> assertion;
+      private Throwable error;
+
+      AckPluginVerifier(BiConsumer<ServerConsumer, AckReason> assertion) {
+         this.assertion = assertion;
+      }
 
       @Override
-      public void messageAcknowledged(MessageReference ref, AckReason reason) {
-         assertEquals(AckReason.EXPIRED, reason);
+      public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
+         try {
+            assertion.accept(consumer, reason);
+         } catch (Throwable e) {
+            error = e;
+            throw e;
+         }
+      }
+
+      private boolean hasError() {
+         return error != null;
+      }
+
+      private String getErrorMsg() {
+         return hasError() ? error.getMessage() : "";
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
index 8e343ca..8977ba5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MethodCalledVerifier.java
@@ -272,13 +272,13 @@ public class MethodCalledVerifier implements ActiveMQServerPlugin {
    }
 
    @Override
-   public void messageExpired(MessageReference message, SimpleString messageExpiryAddress) {
+   public void messageExpired(MessageReference message, SimpleString messageExpiryAddress, ServerConsumer consumer) {
       Preconditions.checkNotNull(message);
       methodCalled(MESSAGE_EXPIRED);
    }
 
    @Override
-   public void messageAcknowledged(MessageReference ref, AckReason reason) {
+   public void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) {
       Preconditions.checkNotNull(ref);
       Preconditions.checkNotNull(reason);
       methodCalled(MESSAGE_ACKED);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bddfa489/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 25300d3c..bc628f5 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
@@ -213,7 +214,13 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
+   public void acknowledge(final MessageReference ref, ServerConsumer consumer) throws Exception {
+      // no-op
+
+   }
+
+   @Override
+   public void acknowledge(MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
       // no-op
 
    }
@@ -225,7 +232,7 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
+   public void acknowledge(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws Exception {
       // no-op
 
    }
@@ -311,6 +318,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void expire(final MessageReference ref, final ServerConsumer consumer) throws Exception {
+      // no-op
+
+   }
+
+   @Override
    public boolean expireReference(final long messageID) throws Exception {
       // no-op
       return false;


[2/2] activemq-artemis git commit: This closes #2012

Posted by ta...@apache.org.
This closes #2012


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4763bc0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4763bc0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4763bc0

Branch: refs/heads/master
Commit: c4763bc088c0d83c323b42ce5be0f542779368dd
Parents: 5535af1 bddfa48
Author: Timothy Bish <ta...@gmail.com>
Authored: Sat Apr 14 10:47:47 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Sat Apr 14 10:47:47 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQConsumer.java | 10 ++--
 .../core/paging/cursor/PagedReferenceImpl.java  | 14 +++--
 .../artemis/core/server/MessageReference.java   |  4 +-
 .../activemq/artemis/core/server/Queue.java     |  8 ++-
 .../core/server/impl/LastValueQueue.java        | 10 +++-
 .../core/server/impl/MessageReferenceImpl.java  | 14 +++--
 .../artemis/core/server/impl/QueueImpl.java     | 54 ++++++++++++--------
 .../core/server/impl/ServerConsumerImpl.java    |  8 +--
 .../core/server/impl/ServerSessionImpl.java     |  5 +-
 .../server/plugin/ActiveMQServerPlugin.java     | 35 +++++++++++++
 .../impl/LoggingActiveMQServerPlugin.java       |  5 +-
 .../impl/LoggingActiveMQServerPluginLogger.java |  6 +--
 .../impl/ScheduledDeliveryHandlerTest.java      | 15 +++++-
 .../integration/plugin/CorePluginTest.java      | 45 ++++++++++++++--
 .../plugin/MethodCalledVerifier.java            |  4 +-
 .../unit/core/postoffice/impl/FakeQueue.java    | 17 +++++-
 16 files changed, 193 insertions(+), 61 deletions(-)
----------------------------------------------------------------------