You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/27 03:27:53 UTC

[activemq-artemis] 02/03: ARTEMIS-3400 add audit logging for message ack

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f554806ec315cb8b4d0844b12674a825e08d44a3
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jul 23 12:54:01 2021 -0500

    ARTEMIS-3400 add audit logging for message ack
    
    Aside from adding audit logging for message acknowledgement this commit
    also consolidates the two nearly identical acknowledge method
    implementations in o.a.a.a.c.s.i.QueueImpl. This avoids duplicating
    code for audit logging, plugin invocation, etc. There is no semantic
    change.
---
 .../apache/activemq/artemis/logs/AuditLogger.java  |  9 +++
 .../artemis/core/server/impl/QueueImpl.java        | 73 +++++++++++-----------
 .../logging/AuditLoggerAMQPMutualSSLTest.java      |  1 +
 .../tests/smoke/logging/AuditLoggerTest.java       |  3 +-
 4 files changed, 49 insertions(+), 37 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 9a41027..94a879c 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2437,6 +2437,15 @@ public interface AuditLogger extends BasicLogger {
    @Message(id = 601501, value = "User {0} is consuming a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT)
    void consumeMessage(String user, String address, String message);
 
+   //hot path log using a different logger
+   static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message) {
+      MESSAGE_LOGGER.acknowledgeMessage(getCaller(user, remoteAddress), queue, message);
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601502, value = "User {0} is acknowledging a message from {1}: {2}", format = Message.Format.MESSAGE_FORMAT)
+   void acknowledgeMessage(String user, String queue, String message);
+
    /*
     * This logger is focused on user interaction from the console or thru resource specific functions in the management layer/JMX
     * */
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 37af7e4..300bed6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -101,6 +101,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
 import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AuditLogger;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.utils.ArtemisCloseable;
 import org.apache.activemq.artemis.utils.BooleanUtil;
@@ -1825,32 +1826,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void acknowledge(final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
-      if (nonDestructive && reason == AckReason.NORMAL) {
-         decDelivering(ref);
-         if (logger.isDebugEnabled()) {
-            logger.debug("acknowledge ignored nonDestructive=true and reason=NORMAL");
-         }
-      } else {
-         if (ref.isPaged()) {
-            pageSubscription.ack((PagedReference) ref);
-            postAcknowledge(ref, reason);
-         } else {
-            Message message = ref.getMessage();
-
-            boolean durableRef = message.isDurable() && isDurable();
-
-            if (durableRef) {
-               storageManager.storeAcknowledge(id, message.getMessageID());
-            }
-            postAcknowledge(ref, reason);
-         }
-
-         ackAttempts.incrementAndGet();
-
-         if (server != null && server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
-         }
-      }
+      acknowledge(null, ref, reason, consumer);
    }
 
    @Override
@@ -1860,34 +1836,59 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public void acknowledge(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws Exception {
-      RefsOperation refsOperation = getRefsOperation(tx, reason);
+      boolean transactional = tx != null;
+      RefsOperation refsOperation = null;
+      if (transactional) {
+         refsOperation = getRefsOperation(tx, reason);
+      }
 
       if (nonDestructive && reason == AckReason.NORMAL) {
-         refsOperation.addOnlyRefAck(ref);
+         if (transactional) {
+            refsOperation.addOnlyRefAck(ref);
+         } else {
+            decDelivering(ref);
+         }
          if (logger.isDebugEnabled()) {
             logger.debug("acknowledge tx ignored nonDestructive=true and reason=NORMAL");
          }
       } else {
          if (ref.isPaged()) {
-            pageSubscription.ackTx(tx, (PagedReference) ref);
-
-            refsOperation.addAck(ref);
+            if (transactional) {
+               pageSubscription.ackTx(tx, (PagedReference) ref);
+               refsOperation.addAck(ref);
+            } else {
+               pageSubscription.ack((PagedReference) ref);
+               postAcknowledge(ref, reason);
+            }
          } else {
             Message message = ref.getMessage();
 
             boolean durableRef = message.isDurable() && isDurable();
 
             if (durableRef) {
-               storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
-
-               tx.setContainsPersistent();
+               if (transactional) {
+                  storageManager.storeAcknowledgeTransactional(tx.getID(), id, message.getMessageID());
+                  tx.setContainsPersistent();
+               } else {
+                  storageManager.storeAcknowledge(id, message.getMessageID());
+               }
+            }
+            if (transactional) {
+               ackAttempts.incrementAndGet();
+               refsOperation.addAck(ref);
+            } else {
+               postAcknowledge(ref, reason);
             }
+         }
 
+         if (!transactional) {
             ackAttempts.incrementAndGet();
-
-            refsOperation.addAck(ref);
          }
 
+         if (AuditLogger.isMessageLoggingEnabled()) {
+            ServerSession session = server.getSessionByID(consumer.getSessionID());
+            AuditLogger.coreAcknowledgeMessage(session.getRemotingConnection().getAuditSubject(), session.getRemotingConnection().getRemoteAddress(), getName().toString(), ref.getMessage().toString());
+         }
          if (server != null && server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
          }
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java
index 64f9d19..f21680a 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerAMQPMutualSSLTest.java
@@ -85,5 +85,6 @@ public class AuditLoggerAMQPMutualSSLTest extends AuditLoggerTestBase {
       checkAuditLogRecord(true, "AMQ601500: User myUser(producers)@", "is sending a message AMQPStandardMessage");
       checkAuditLogRecord(true, "AMQ601265: User myUser(producers)@", "is creating a core consumer");
       checkAuditLogRecord(true, "AMQ601501: User myUser(producers)@", "is consuming a message from exampleQueue");
+      checkAuditLogRecord(true, "AMQ601502: User myUser(producers)@", "is acknowledging a message from exampleQueue: AMQPStandardMessage");
    }
 }
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
index 4cd33b9..8e94d86 100644
--- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/logging/AuditLoggerTest.java
@@ -171,9 +171,10 @@ public class AuditLoggerTest extends AuditLoggerTestBase {
          Assert.assertNotNull(clientMessage);
          clientMessage = consumer.receive(5000);
          Assert.assertNotNull(clientMessage);
-         checkAuditLogRecord(true, "is consuming a message from");
       } finally {
          connection.close();
       }
+      checkAuditLogRecord(true, "is consuming a message from");
+      checkAuditLogRecord(true, "is acknowledging a message from");
    }
 }