You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/07/20 15:03:04 UTC

[activemq-artemis] 25/27: ARTEMIS-3896 clarify logging for transactional ops

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch new-logging
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 10e1e3001cc7b3d1d7ff4764f24529f6a3db1efe
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jul 15 23:54:40 2022 -0500

    ARTEMIS-3896 clarify logging for transactional ops
    
    Both audit logging and logging from the LoggingActiveMQServerPlugin are
    unclear as they relate to transactional sends and acks. Both essentially
    ignore the transaction which makes it appear that an operation has taken
    place when, in fact, it hasn't (e.g. a transactional ack is rolled back
    but the log indicates the ack went through).
    
    This commit fix this with the following changes:
    
     - Log details when a send or ack is added to a transaction.
     - Log details when the transaction is committed.
     - Log when the transaction is rolled back.
     - Include transaction details in the relevant DEBUG logs.
     - Simplify INFO level logging for sends & acks in
    LoggingActiveMQServerPlugin. Ensure details are in the DEBUG logs.
    
    Other changes:
    
     - Make capitalization more consistent in a handful of audit logs.
---
 .../apache/activemq/artemis/logs/AuditLogger.java  | 68 ++++++++++++------
 .../artemis/core/server/impl/QueueImpl.java        | 30 ++++++--
 .../core/server/impl/ServerSessionImpl.java        | 28 ++++++--
 .../server/plugin/ActiveMQServerMessagePlugin.java | 17 +++++
 .../plugin/impl/LoggingActiveMQServerPlugin.java   | 82 +++++++++++++++++-----
 .../impl/LoggingActiveMQServerPluginLogger.java    | 24 ++++---
 6 files changed, 187 insertions(+), 62 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 937c4e7d1d..c23e6091c3 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2130,12 +2130,12 @@ public interface AuditLogger {
     *
     * */
    //hot path log using a different logger
-   static void coreSendMessage(Subject user, String remoteAddress, String messageToString, Object context) {
-      MESSAGE_LOGGER.logCoreSendMessage(getCaller(user, remoteAddress), messageToString, context);
+   static void coreSendMessage(Subject user, String remoteAddress, String messageToString, Object context, String tx) {
+      MESSAGE_LOGGER.coreSendMessage(getCaller(user, remoteAddress), messageToString, context, tx);
    }
 
-   @LogMessage(id = 601500, value = "User {} is sending a message {}, with Context: {}", level = LogMessage.Level.INFO)
-   void logCoreSendMessage(String user, String messageToString, Object context);
+   @LogMessage(id = 601500, value = "User {0} sent a message {1}, context: {2}, transaction: {3}", level = LogMessage.Level.INFO)
+   void coreSendMessage(String user, String messageToString, Object context, String tx);
 
    //hot path log using a different logger
    static void coreConsumeMessage(Subject user, String remoteAddress, String queue, String message) {
@@ -2146,12 +2146,12 @@ public interface AuditLogger {
    void consumeMessage(String user, String address, String message);
 
    //hot path log using a different logger
-   static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message) {
-      MESSAGE_LOGGER.acknowledgeMessage(getCaller(user, remoteAddress), queue, message);
+   static void coreAcknowledgeMessage(Subject user, String remoteAddress, String queue, String message, String tx) {
+      MESSAGE_LOGGER.coreAcknowledgeMessage(getCaller(user, remoteAddress), queue, message, tx);
    }
 
-   @LogMessage(id = 601502, value = "User {} is acknowledging a message from {}: {}", level = LogMessage.Level.INFO)
-   void acknowledgeMessage(String user, String queue, String message);
+   @LogMessage(id = 601502, value = "User {} acknowledged message from {}: {}, transaction: {}", level = LogMessage.Level.INFO)
+   void coreAcknowledgeMessage(String user, String queue, String message, String tx);
 
    /*
     * This logger is focused on user interaction from the console or thru resource specific functions in the management layer/JMX
@@ -2161,35 +2161,35 @@ public interface AuditLogger {
       RESOURCE_LOGGER.createAddressSuccess(getCaller(), name, routingTypes);
    }
 
-   @LogMessage(id = 601701, value = "User {} successfully created Address: {} with routing types {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601701, value = "User {} successfully created address: {} with routing types {}", level = LogMessage.Level.INFO)
    void createAddressSuccess(String user, String name, String routingTypes);
 
    static void createAddressFailure(String name, String routingTypes) {
       RESOURCE_LOGGER.createAddressFailure(getCaller(), name, routingTypes);
    }
 
-   @LogMessage(id = 601702, value = "User {} failed to created Address: {} with routing types {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601702, value = "User {} failed to created address: {} with routing types {}", level = LogMessage.Level.INFO)
    void createAddressFailure(String user, String name, String routingTypes);
 
    static void updateAddressSuccess(String name, String routingTypes) {
       RESOURCE_LOGGER.updateAddressSuccess(getCaller(), name, routingTypes);
    }
 
-   @LogMessage(id = 601703, value = "User {} successfully updated Address: {} with routing types {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601703, value = "User {} successfully updated address: {} with routing types {}", level = LogMessage.Level.INFO)
    void updateAddressSuccess(String user, String name, String routingTypes);
 
    static void updateAddressFailure(String name, String routingTypes) {
       RESOURCE_LOGGER.updateAddressFailure(getCaller(), name, routingTypes);
    }
 
-   @LogMessage(id = 601704, value = "User {} successfully updated Address: {} with routing types {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601704, value = "User {} successfully updated address: {} with routing types {}", level = LogMessage.Level.INFO)
    void updateAddressFailure(String user, String name, String routingTypes);
 
    static void deleteAddressSuccess(String name) {
       RESOURCE_LOGGER.deleteAddressSuccess(getCaller(), name);
    }
 
-   @LogMessage(id = 601705, value = "User {} successfully deleted Address: {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601705, value = "User {} successfully deleted address: {}", level = LogMessage.Level.INFO)
    void deleteAddressSuccess(String user, String name);
 
 
@@ -2197,35 +2197,35 @@ public interface AuditLogger {
       RESOURCE_LOGGER.deleteAddressFailure(getCaller(), name);
    }
 
-   @LogMessage(id = 601706, value = "User {} failed to deleted Address: {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601706, value = "User {} failed to deleted address: {}", level = LogMessage.Level.INFO)
    void deleteAddressFailure(String user, String name);
 
    static void createQueueSuccess(String name, String address, String routingType) {
       RESOURCE_LOGGER.createQueueSuccess(getCaller(), name, address, routingType);
    }
 
-   @LogMessage(id = 601707, value = "User {} successfully created Queue: {} on Address: {} with routing type {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601707, value = "User {} successfully created queue: {} on address: {} with routing type {}", level = LogMessage.Level.INFO)
    void createQueueSuccess(String user, String name, String address, String routingType);
 
    static void createQueueFailure(String name, String address, String routingType) {
       RESOURCE_LOGGER.createQueueFailure(getCaller(), name, address, routingType);
    }
 
-   @LogMessage(id = 601708, value = "User {} failed to create Queue: {} on Address: {} with routing type {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601708, value = "User {} failed to create queue: {} on address: {} with routing type {}", level = LogMessage.Level.INFO)
    void createQueueFailure(String user, String name, String address, String routingType);
 
    static void updateQueueSuccess(String name, String routingType) {
       RESOURCE_LOGGER.updateQueueSuccess(getCaller(), name, routingType);
    }
 
-   @LogMessage(id = 601709, value = "User {} successfully updated Queue: {} with routing type {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601709, value = "User {} successfully updated queue: {} with routing type {}", level = LogMessage.Level.INFO)
    void updateQueueSuccess(String user, String name, String routingType);
 
    static void updateQueueFailure(String name, String routingType) {
       RESOURCE_LOGGER.updateQueueFailure(getCaller(), name, routingType);
    }
 
-   @LogMessage(id = 601710, value = "User {} failed to update Queue: {} with routing type {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601710, value = "User {} failed to update queue: {} with routing type {}", level = LogMessage.Level.INFO)
    void updateQueueFailure(String user, String name, String routingType);
 
 
@@ -2233,28 +2233,28 @@ public interface AuditLogger {
       RESOURCE_LOGGER.destroyQueueSuccess(getCaller(), name);
    }
 
-   @LogMessage(id = 601711, value = "User {} successfully deleted Queue: {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601711, value = "User {} successfully deleted queue: {}", level = LogMessage.Level.INFO)
    void destroyQueueSuccess(String user, String name);
 
    static void destroyQueueFailure(String name) {
       RESOURCE_LOGGER.destroyQueueFailure(getCaller(), name);
    }
 
-   @LogMessage(id = 601712, value = "User {} failed to delete Queue: {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601712, value = "User {} failed to delete queue: {}", level = LogMessage.Level.INFO)
    void destroyQueueFailure(String user, String name);
 
    static void removeMessagesSuccess(int removed, String queue) {
       RESOURCE_LOGGER.removeMessagesSuccess(getCaller(), removed, queue);
    }
 
-   @LogMessage(id = 601713, value = "User {} has removed {} messages from Queue: {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601713, value = "User {} has removed {} messages from queue: {}", level = LogMessage.Level.INFO)
    void removeMessagesSuccess(String user, int removed, String queue);
 
    static void removeMessagesFailure(String queue) {
       RESOURCE_LOGGER.removeMessagesFailure(getCaller(), queue);
    }
 
-   @LogMessage(id = 601714, value = "User {} failed to remove messages from Queue: {}", level = LogMessage.Level.INFO)
+   @LogMessage(id = 601714, value = "User {} failed to remove messages from queue: {}", level = LogMessage.Level.INFO)
    void removeMessagesFailure(String user, String queue);
 
    static void userSuccesfullyAuthenticatedInAudit(Subject subject, String remoteAddress) {
@@ -2582,4 +2582,28 @@ public interface AuditLogger {
 
    @LogMessage(id = 601758, value = "User {} is calling schedulePageCleanup on address: {}", level = LogMessage.Level.INFO)
    void schedulePageCleanup(String user, Object address);
+
+   //hot path log using a different logger
+   static void addAckToTransaction(Subject user, String remoteAddress, String queue, String message, String tx) {
+      MESSAGE_LOGGER.addAckToTransaction(getCaller(user, remoteAddress), queue, message, tx);
+   }
+
+   @LogMessage(id = 601759, value = "User {} added acknowledgement of a message from {}: {} to transaction: {}", level = LogMessage.Level.INFO)
+   void addAckToTransaction(String user, String queue, String message, String tx);
+
+   //hot path log using a different logger
+   static void addSendToTransaction(Subject user, String remoteAddress, String messageToString, String tx) {
+      MESSAGE_LOGGER.addSendToTransaction(getCaller(user, remoteAddress), messageToString, tx);
+   }
+
+   @LogMessage(id = 601760, value = "User {} added a message send for: {} to transaction: {}", level = LogMessage.Level.INFO)
+   void addSendToTransaction(String user, String messageToString, String tx);
+
+   //hot path log using a different logger
+   static void rolledBackTransaction(Subject user, String remoteAddress, String tx, String resource) {
+      MESSAGE_LOGGER.rolledBackTransaction(getCaller(user, remoteAddress), tx, resource);
+   }
+
+   @LogMessage(id = 601761, value = "User {} rolled back transaction {} involving {}", level = LogMessage.Level.INFO)
+   void rolledBackTransaction(String user, String tx, String resource);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 186373f9f9..1362c33fa2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import javax.security.auth.Subject;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.math.BigDecimal;
@@ -1883,19 +1884,38 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          }
 
          if (AuditLogger.isMessageLoggingEnabled()) {
-            ServerSession session = null;
             // it's possible for the consumer to be null (e.g. acking the message administratively)
-            if (consumer != null) {
-               session = server.getSessionByID(consumer.getSessionID());
+            final ServerSession session = consumer != null ? server.getSessionByID(consumer.getSessionID()) : null;
+            final Subject subject = session == null ? null : session.getRemotingConnection().getAuditSubject();
+            final String remoteAddress = session == null ? null : session.getRemotingConnection().getRemoteAddress();
+
+            if (transactional) {
+               AuditLogger.addAckToTransaction(subject, remoteAddress, getName().toString(), ref.getMessage().toString(), tx.toString());
+               tx.addOperation(new TransactionOperationAbstract() {
+                  @Override
+                  public void afterCommit(Transaction tx) {
+                     auditLogAck(subject, remoteAddress, ref, tx);
+                  }
+
+                  @Override
+                  public void afterRollback(Transaction tx) {
+                     AuditLogger.rolledBackTransaction(subject, remoteAddress, tx.toString(), ref.toString());
+                  }
+               });
+            } else {
+               auditLogAck(subject, remoteAddress, ref, tx);
             }
-            AuditLogger.coreAcknowledgeMessage(session == null ? null : session.getRemotingConnection().getAuditSubject(), session == null ? null : session.getRemotingConnection().getRemoteAddress(), getName().toString(), ref.getMessage().toString());
          }
          if (server != null && server.hasBrokerMessagePlugins()) {
-            server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(ref, reason, consumer));
+            server.callBrokerMessagePlugins(plugin -> plugin.messageAcknowledged(tx, ref, reason, consumer));
          }
       }
    }
 
+   private void auditLogAck(Subject subject, String remoteAddress, MessageReference ref, Transaction tx) {
+      AuditLogger.coreAcknowledgeMessage(subject, remoteAddress, getName().toString(), ref.getMessage().toString(), tx == null ? null : tx.toString());
+   }
+
    @Override
    public void reacknowledge(final Transaction tx, final MessageReference ref) throws Exception {
       Message message = ref.getMessage();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 0c0a38e86e..a9f3fe3fe9 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1813,10 +1813,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             message.setMessageID(id);
          }
 
-         if (AuditLogger.isMessageLoggingEnabled()) {
-            AuditLogger.coreSendMessage(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), message.toString(), routingContext);
-         }
-
          SimpleString address = message.getAddressSimpleString();
 
          if (defaultAddress == null && address != null) {
@@ -1845,6 +1841,25 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
             result = doSend(tx, message, address, direct, noAutoCreateQueue, routingContext);
          }
 
+         if (AuditLogger.isMessageLoggingEnabled()) {
+            if (tx != null && !autoCommitSends) {
+               AuditLogger.addSendToTransaction(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), message.toString(), tx.toString());
+               tx.addOperation(new TransactionOperationAbstract() {
+                  @Override
+                  public void afterCommit(Transaction tx) {
+                     auditLogSend(message, tx);
+                  }
+
+                  @Override
+                  public void afterRollback(Transaction tx) {
+                     AuditLogger.rolledBackTransaction(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), tx.toString(), message.toString());
+                  }
+               });
+            } else {
+               auditLogSend(message, null);
+            }
+         }
+
       } catch (Exception e) {
          if (server.hasBrokerMessagePlugins()) {
             server.callBrokerMessagePlugins(plugin -> plugin.onSendException(this, tx, message, direct, noAutoCreateQueue, e));
@@ -1852,11 +1867,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
          throw e;
       }
       if (server.hasBrokerMessagePlugins()) {
-         server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, tx, message, direct, noAutoCreateQueue, result));
+         server.callBrokerMessagePlugins(plugin -> plugin.afterSend(this, autoCommitSends ? null : tx, message, direct, noAutoCreateQueue, result));
       }
       return result;
    }
 
+   private void auditLogSend(Message message, Transaction tx) {
+      AuditLogger.coreSendMessage(remotingConnection.getAuditSubject(), remotingConnection.getRemoteAddress(), message.toString(), routingContext, tx == null ? null : tx.toString());
+   }
 
    @Override
    public void requestProducerCredits(SimpleString address, final int credits) throws Exception {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
index a211b338c7..06e7378391 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/ActiveMQServerMessagePlugin.java
@@ -270,8 +270,25 @@ public interface ActiveMQServerMessagePlugin extends ActiveMQServerBasePlugin {
     * @throws ActiveMQException
     *
     */
+   @Deprecated
    default void messageAcknowledged(MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
       //by default call the old method for backwards compatibility
       this.messageAcknowledged(ref, reason);
    }
+
+   /**
+    * A message has been acknowledged
+    *
+    * @param tx The transaction associated with the ack
+    * @param ref The acked message
+    * @param reason The ack reason
+    * @param consumer the Consumer that acknowledged the message - this field is optional
+    * and can be null
+    * @throws ActiveMQException
+    *
+    */
+   default void messageAcknowledged(Transaction tx, MessageReference ref, AckReason reason, ServerConsumer consumer) throws ActiveMQException {
+      //by default call the old method for backwards compatibility
+      this.messageAcknowledged(ref, reason, consumer);
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
index 12202c001f..8721b0571e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPlugin.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 import org.apache.activemq.artemis.utils.critical.CriticalComponent;
@@ -432,7 +433,7 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
    /**
     * After a message is sent
     *
-    * @param session           the session that sends the message
+    * @param session
     * @param tx
     * @param message
     * @param direct
@@ -447,17 +448,40 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
                          boolean direct,
                          boolean noAutoCreateQueue,
                          RoutingStatus result) throws ActiveMQException {
-
-      if (logAll || logSendingEvents) {
-
-         //details - debug level
-         LoggingActiveMQServerPluginLogger.LOGGER.afterSendDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), message, (session == null ? UNAVAILABLE : session.getName()), tx, session, direct, noAutoCreateQueue);
-
-         //info level log
-         LoggingActiveMQServerPluginLogger.LOGGER.afterSend((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), (session == null ? UNAVAILABLE : session.getName()), (session == null ? UNAVAILABLE : session.getConnectionID().toString()), result);
+      if (logAll || logDeliveringEvents) {
+         LoggingActiveMQServerPluginLogger.LOGGER.afterSendDetails(message,
+                                                                   result.toString(),
+                                                                   tx,
+                                                                   (session == null ? UNAVAILABLE : session.getName()),
+                                                                   (session == null ? UNAVAILABLE : session.getConnectionID().toString()),
+                                                                   direct,
+                                                                   noAutoCreateQueue);
+         if (tx != null) {
+            tx.addOperation(new TransactionOperationAbstract() {
+               @Override
+               public void afterCommit(Transaction tx) {
+                  logSend(tx, message, result);
+               }
+
+               @Override
+               public void afterRollback(Transaction tx) {
+                  LoggingActiveMQServerPluginLogger.LOGGER.rolledBackTransaction(tx, message.toString());
+               }
+            });
+         } else {
+            logSend(tx, message, result);
+         }
       }
    }
 
+   private void logSend(Transaction tx,
+                        Message message,
+                        RoutingStatus result) {
+      LoggingActiveMQServerPluginLogger.LOGGER.afterSend((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+                                                         result,
+                                                         (tx == null ? UNAVAILABLE : tx.toString()));
+   }
+
    @Override
    public void onSendException(ServerSession session,
                                Transaction tx,
@@ -590,26 +614,46 @@ public class LoggingActiveMQServerPlugin implements ActiveMQServerPlugin, Serial
    /**
     * A message has been acknowledged
     *
-    * @param ref    The acked message
-    * @param reason The ack reason
+    * @param tx       The transaction for the ack
+    * @param ref      The acked message
+    * @param reason   The ack reason
+    * @param consumer The consumer acking the ref
     * @throws ActiveMQException
     */
    @Override
-   public void messageAcknowledged(MessageReference ref,
-                                   AckReason reason,
-                                   ServerConsumer consumer) throws ActiveMQException {
+   public void messageAcknowledged(final Transaction tx, final MessageReference ref, final AckReason reason, final ServerConsumer consumer) throws ActiveMQException {
       if (logAll || logDeliveringEvents) {
-
-         //details - debug logging
-         LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledgedDetails(ref, reason);
-
          Message message = (ref == null ? null : ref.getMessage());
          Queue queue = (ref == null ? null : ref.getQueue());
 
-         LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())), (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null), (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())), (queue == null ? UNAVAILABLE : queue.getName().toString()), reason);
+         LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledgedDetails((message == null ? UNAVAILABLE : Long.toString(message.getMessageID())),
+                                                                             (consumer == null ? UNAVAILABLE : consumer.getSessionID() != null ? consumer.getSessionID() : null),
+                                                                             (consumer == null ? UNAVAILABLE : Long.toString(consumer.getID())),
+                                                                             (queue == null ? UNAVAILABLE : queue.getName().toString()),
+                                                                             (tx == null ? UNAVAILABLE : tx.toString()),
+                                                                             reason);
+         if (tx != null) {
+            tx.addOperation(new TransactionOperationAbstract() {
+               @Override
+               public void afterCommit(Transaction tx) {
+                  logAck(tx, ref);
+               }
+
+               @Override
+               public void afterRollback(Transaction tx) {
+                  LoggingActiveMQServerPluginLogger.LOGGER.rolledBackTransaction(tx, ref.toString());
+               }
+            });
+         } else {
+            logAck(tx, ref);
+         }
       }
    }
 
+   private void logAck(Transaction tx, MessageReference ref) {
+      LoggingActiveMQServerPluginLogger.LOGGER.messageAcknowledged(ref, tx);
+   }
+
    /**
     * Before a bridge is deployed
     *
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
index d4a8ce3811..e54b76aab4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/LoggingActiveMQServerPluginLogger.java
@@ -80,8 +80,8 @@ public interface LoggingActiveMQServerPluginLogger {
                           boolean removeConsumers,
                           boolean autoDeleteAddress);
 
-   @LogMessage(id = 841009, value = "sent message with ID: {}, session name: {}, session connectionID: {}, result: {}", level = LogMessage.Level.INFO)
-   void afterSend(String messageID, String sessionName, String sessionConnectionID, RoutingStatus result);
+   @LogMessage(id = 841009, value = "sent message with ID: {}, result: {}, transaction: {}", level = LogMessage.Level.INFO)
+   void afterSend(String messageID, RoutingStatus result, String tx);
 
    @LogMessage(id = 841010, value = "routed message with ID: {}, result: {}", level = LogMessage.Level.INFO)
    void afterMessageRoute(String messageID, RoutingStatus result);
@@ -99,8 +99,8 @@ public interface LoggingActiveMQServerPluginLogger {
    @LogMessage(id = 841013, value = "expired message: {}, messageExpiryAddress: {}", level = LogMessage.Level.INFO)
    void messageExpired(MessageReference message, SimpleString messageExpiryAddress);
 
-   @LogMessage(id = 841014, value = "acknowledged message ID: {}, messageRef sessionID: {}, with messageRef consumerID: {}, messageRef QueueName: {}," + "  with ackReason: {}", level = LogMessage.Level.INFO)
-   void messageAcknowledged(String messageID, String sessionID, String consumerID, String queueName, AckReason reason);
+   @LogMessage(id = 841014, value = "acknowledged message: {}, with transaction: {}", level = LogMessage.Level.INFO)
+   void messageAcknowledged(MessageReference ref, Transaction tx);
 
    @LogMessage(id = 841015, value = "deployed bridge: {}", level = LogMessage.Level.INFO)
    void afterDeployBridge(Bridge config);
@@ -164,12 +164,12 @@ public interface LoggingActiveMQServerPluginLogger {
                    boolean direct,
                    boolean noAutoCreateQueue);
 
-   @LogMessage(id = 843009, value = "message ID: {}, message {}, session name: {} with tx: {}, session: {}, direct: {}," + " noAutoCreateQueue: {}", level = LogMessage.Level.DEBUG)
-   void afterSendDetails(String messageID,
-                         org.apache.activemq.artemis.api.core.Message message,
-                         String sessionName,
+   @LogMessage(id = 843009, value = "afterSend message: {}, result: {}, transaction: {}, session: {}, connection: {}, direct: {}, noAutoCreateQueue: {}", level = LogMessage.Level.DEBUG)
+   void afterSendDetails(org.apache.activemq.artemis.api.core.Message message,
+                         String result,
                          Transaction tx,
-                         ServerSession session,
+                         String sessionName,
+                         String connectionID,
                          boolean direct,
                          boolean noAutoCreateQueue);
 
@@ -197,8 +197,8 @@ public interface LoggingActiveMQServerPluginLogger {
                             MessageReference reference,
                             ServerConsumer consumer);
 
-   @LogMessage(id = 843014, value = "acknowledged message: {}, with ackReason: {}", level = LogMessage.Level.DEBUG)
-   void messageAcknowledgedDetails(MessageReference ref, AckReason reason);
+   @LogMessage(id = 843014, value = "messageAcknowledged ID: {}, sessionID: {}, consumerID: {}, queue: {}, transaction: {}, ackReason: {}", level = LogMessage.Level.DEBUG)
+   void messageAcknowledgedDetails(String messageID, String sessionID, String consumerID, String queueName, String tx, AckReason reason);
 
    @LogMessage(id = 843015, value = "beforeDeployBridge called with bridgeConfiguration: {}", level = LogMessage.Level.DEBUG)
    void beforeDeployBridge(BridgeConfiguration config);
@@ -218,4 +218,6 @@ public interface LoggingActiveMQServerPluginLogger {
                                    boolean direct,
                                    boolean rejectDuplicates);
 
+   @LogMessage(id = 843018, value = "rolled back transaction {} involving {}", level = LogMessage.Level.DEBUG)
+   void rolledBackTransaction(Transaction tx, String resource);
 }